about summary refs log tree commit diff
path: root/saneterm/pty.py
blob: fd423c68cd923957f94e3a136e5bf543bc84b867 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
import os
import re

from pty import fork
from .color import Color, ColorType, BasicColor
from enum import Enum, auto
from gi.repository import GLib, Pango

TERM = "dumb"

class Source(GLib.Source):
    master = -1

    def __init__(self, cmd):
        GLib.Source.__init__(self)
        self.cmd = cmd
        self.tag = None

    def prepare(self):
        if self.master != -1:
            return False, -1

        pid, self.master = fork()
        if pid == 0:
            # Terminal options enforced by saneterm.
            # Most importantly, local echo is disabled. Instead we show
            # characters on input directly in the GTK termview/TextBuffer.
            os.system("stty -onlcr -echo")

            os.environ["TERM"] = TERM
            os.execvp(self.cmd[0], self.cmd)

        events = GLib.IOCondition.IN|GLib.IOCondition.HUP
        self.tag = self.add_unix_fd(self.master, events)

        return False, -1

    def check(self):
        return False

    def dispatch(self, callback, args):
        return callback(self, self.tag, self.master)

class EventType(Enum):
    TEXT = auto()
    BELL = auto()
    TEXT_STYLE = auto()

class TextStyleChange(Enum):
    """
    Each TextStyleChange describes a way in which escape
    sequences may influence the way text is displayed.
    Together with an additional value (True or False,
    a color, an enum from Pango, …) a TextStyleChange
    can represent the actual impact an escape sequence
    has on the font rendering.

    The important invariant here is that all associated
    represented changes are _mutually exclusive_:
    E. g. (TextStyleChange.ITALIC, True) and
    (TextStyleChange.ITALIC, False) can't be applied at
    the same time — one will replace the other.
    This invariant greatly simplifies state tracking.
    """

    # resets the display style to an arbitrary default
    # No associated value
    RESET = auto()
    # Enables/disables italic text
    # associated with a boolean
    ITALIC = auto()
    # Enables/disables text being crossed out
    # associated with a boolean
    STRIKETHROUGH = auto()
    # Describes weight of the font to use
    # associated with a Pango.Weight enum
    WEIGHT = auto()
    # Disables or enables an underline style
    # associated with a Pango.Underline enum
    UNDERLINE = auto()
    # Hides/shows the text. If hidden, should
    # not be readable, but in many implementations
    # the text is still able to be copied.
    # associated with a boolean
    CONCEALED = auto()
    # Sets the text's color or resets
    # it to a default if None.
    # associated with either None or a Color
    FOREGROUND_COLOR = auto()
    # Sets the text's background color or resets
    # it to a default if None.
    # associated with either None or a Color
    BACKGROUND_COLOR = auto()

class PositionedIterator(object):
    """
    Wrapper class which implements the iterator interface
    for a string. In contrast to the default implementation
    it works by tracking an index in the string internally.

    This allows the following additional features:

    * Checking whether the iterator has any elements left
      using empty()
    * Jumping back to a previous point via backtrack()

    The object exposes the following attributes:

    * pos: the index of the last element received via __next__()
    * wrapped: the string used for construction
    """
    def __init__(self, s):
        # always points to the position of the element
        # just received via __next__()
        self.pos = -1
        self.wrapped = s

        self.waypoints = []

    def waypoint(self):
        """
        Mark the index backtrack() should jump to when called.
        Calling this will make the character received by __next__()
        after calling backtrack() at any point in the future be
        the same which was last received via __next__() before
        calling waypoint().

        Counterintutively, this means that pos immediately after
        calling waypoint() will be greater than right after
        calling backtrack() subsequently.

        This allows you to decide whether or not to set a waypoint
        after inspecting an element which is useful when writing
        parsers:

        def example(s):
          it = PositionedIterator(s)

          ignore_colon = False

          for x in it:
            if ignore_colon:
              ignore_colon = False
              # do nothing
            elif x == ':':
              it.waypoint()

              if x.next() == ' ':
                # do stuff …
              else:
                it.backtrack()
                ignore_colon = True
        """
        # TODO: maybe don't support calling waypoint if pos == -1
        self.waypoints.append(max(self.pos - 1, -1))

    def backtrack(self):
        """See documentation of waypoint()"""
        self.pos = self.waypoints.pop()

    def next(self):
        """Shortcut for __next__()"""
        return self.__next__()

    def take(self, n):
        """
        Consume n elements of the iterator and return them as a string slice.
        """
        start = self.pos + 1

        for _ in range(n):
            _ = self.__next__()

        end = self.pos + 1

        return self.wrapped[start:end]

    def takewhile_greedy(self, f):
        """
        Consume elements while a given predicate returns True and
        return them as a string slice. takewhile_greedy() expects
        the predicate to return False at least once before the end
        of input and will otherwise raise a StopIteration condition.

        Thus using takewhile_greedy() only makes sense if whatever
        your parsing is terminated in some way:

        def example(s):
          foo = takewhile_greedy(lambda x: x != ';')

        example("foo")  # fails
        example("foo;") # succeeds, but doesn't consume ';'

        (In a real example you'd also consume the semicolon)
        """
        x = self.__next__()
        start = self.pos

        while f(x):
            x = self.__next__()

        end = self.pos
        self.pos -= 1

        return self.wrapped[start:end]

    def empty(self):
        """
        Check if the iterator has no elements left
        without consuming the next item (if any).
        """
        return self.pos + 1 == len(self.wrapped)

    def __iter__(self):
        return self

    def __next__(self):
        self.pos += 1

        try:
            return self.wrapped[self.pos]
        except IndexError:
            self.pos -= 1
            raise StopIteration

def csi_parameter_byte(c):
    """
    Check if the given unicode character is a CSI sequence
    parameter byte. See ECMA-48 (5th edition) Section 5.4.
    """
    cp = ord(c)
    return cp >= 0x30 and cp <= 0x3f

def csi_intermediate_byte(c):
    """
    Check if the given unicode character is a CSI sequence
    intermediate byte. See ECMA-48 (5th edition) Section 5.4.
    """
    cp = ord(c)
    return cp >= 0x20 and cp <= 0x2f

def csi_final_byte(c):
    """
    Check if the given unicode character is a CSI sequence
    final byte. See ECMA-48 (5th edition) Section 5.4.
    """
    cp = ord(c)
    return cp >= 0x40 and cp <= 0x7e

def parse_extended_color(iterator):
    """
    Parse extended color sequences (CSI [ 38 and CSI [ 48).
    Takes an iterator which has already consumed the initial
    SGR sequence type argument and returns a Color.
    On failure an AssertionError is raised.

    Relevant standards:
    * Definition of the SGR extended color escape sequence:
      ITU-T Rec. T.416 | ISO/IEC 8613-6
      https://www.itu.int/rec/dologin_pub.asp?lang=e&id=T-REC-T.416-199303-I!!PDF-E&type=items
    * Full definition of the colour specification including the “colour space id”:
      ITU-T Rec. T.412 | ISO/IEC 8613-2
      https://www.itu.int/rec/dologin_pub.asp?lang=e&id=T-REC-T.412-199303-I!!PDF-E&type=items
    """
    args = list(iterator)

    try:
        if args[0] == '5':
            # 256 color
            assert len(args) == 2

            return Color(
                ColorType.NUMBERED_256,
                int(args[1])
            )
        elif args[0] == '2':
            # truecolor
            if len(args) == 4:
                channels = tuple(args[1:4])
            elif len(args) >= 5:
                # TODO: handle color space id and tolerance values
                channels = tuple(args[2:5])
            else:
                raise AssertionError("too few arguments")

            return Color(
                ColorType.TRUECOLOR,
                tuple(int(c) for c in channels)
            )
        elif args[0] == '0':
            # The standard specifies this as “implementation defined”,
            # so we define this as color reset
            return None
        else:
            # TODO: support
            #
            #   1   transparent
            #   3   CMY
            #   4   CMYK
            #
            # … but who needs these?
            raise AssertionError("unsupported extended color")

    # convert a few exceptions that can happen while parsing
    # into AssertionErrors to indicate that they are
    # “expected” parse failures
    except IndexError:
        # args[0] out of range
        raise AssertionError("too few arguments")

    except TypeError:
        # raised in Color.__init__()
        raise AssertionError("malformed color")

    except ValueError:
        # raised by usage of int()
        raise AssertionError("unexpected non-integer")


def parse_sgr_sequence(params, special_evs):
    """
    SGR (Select Graphic Rendition) sequence:
    any number of numbers separated by ';'
    which change the current text presentation.
    If the parameter string is empty, a single '0'
    is implied.

    We support a subset of the core SGR sequences
    as specified by ECMA-48. Most notably we also
    support the common additional bright color
    sequences. This also justifies not to implement
    the strange behavior of choosing brighter colors
    when the current text is bold.

    We also support ':' as a separator which is
    only necessary for extended color sequences
    as specified in ITU-T Rec. T.416 | ISO/IEC 8613-6
    (see also parse_extended_color()). This separator
    is used for sequences which only contain one
    parameter identifying the kind of sequence and
    additional arguments. This is contrary to normal
    SGR sequences where multiple parameters are
    treatet as if they were in separate sequences.

    The sequences specified in ITU-T Rec. T.416
    must use colons as separators. In reality however
    CLIs often use semicolons instead, so we support
    both. This can cause issues in edge cases, but
    is probably better than some applications not
    working properly.
    """
    # If a colon is used as seperator we know that
    # the params following the first one are just
    # parameters and may not be intepreted as
    # normal SGR parameters. We respect this
    # by breaking after the first loop iteration.
    single_param_with_args = ':' in params

    params_split = re.split(r'[:;]', params)

    params_it = iter(params_split)
    for p in params_it:
        if len(p) == 0:
            # empty implies 0
            sgr_type = 0
        else:
            try:
                sgr_type = int(p)
            except ValueError:
                raise AssertionError("Invalid Integer")

        change_payload = None

        # Not supported:
        #   5-6     blink
        #   7       invert
        #   10      default font
        #   11-19   alternative font
        #   20      blackletter font
        #   25      disable blinking
        #   26      proportional spacing
        #   27      disable inversion
        #   50      disable proportional spacing
        #   51      framed
        #   52      encircled
        #   53      overlined (TODO: implement via GTK 4 TextTag)
        #   54      neither framed nor encircled
        #   55      not overlined
        #   60-65   ideograms (TODO: find out what this is supposed to do)
        #   58-59   underline color, non-standard
        #   73-65   sub/superscript, non-standard (TODO: via scale and rise)
        if sgr_type == 0:
            change_payload = (TextStyleChange.RESET, None)
        elif sgr_type == 1:
            change_payload = (TextStyleChange.WEIGHT, Pango.Weight.BOLD)
        elif sgr_type == 2:
            change_payload = (TextStyleChange.WEIGHT, Pango.Weight.THIN)
        elif sgr_type == 3:
            change_payload = (TextStyleChange.ITALIC, True)
        elif sgr_type == 4:
            change_payload = (TextStyleChange.UNDERLINE, Pango.Underline.SINGLE)
        elif sgr_type == 8:
            change_payload = (TextStyleChange.CONCEALED, True)
        elif sgr_type == 9:
            change_payload = (TextStyleChange.STRIKETHROUGH, True)
        elif sgr_type == 21:
            change_payload = (TextStyleChange.UNDERLINE, Pango.Underline.DOUBLE)
        elif sgr_type == 22:
            change_payload = (TextStyleChange.WEIGHT, Pango.Weight.NORMAL)
        elif sgr_type == 23:
            # also theoretically should disable blackletter
            change_payload = (TextStyleChange.ITALIC, False)
        elif sgr_type == 24:
            change_payload = (TextStyleChange.UNDERLINE, Pango.Underline.NONE)
        elif sgr_type == 28:
            change_payload = (TextStyleChange.CONCEALED, False)
        elif sgr_type == 29:
            change_payload = (TextStyleChange.STRIKETHROUGH, False)
        elif sgr_type >= 30 and sgr_type <= 37:
            change_payload = (
                TextStyleChange.FOREGROUND_COLOR,
                Color(
                    ColorType.NUMBERED_8,
                    BasicColor(sgr_type - 30)
                )
            )
        elif sgr_type == 38:
            try:
                change_payload = (
                    TextStyleChange.FOREGROUND_COLOR,
                    parse_extended_color(params_it)
                )
            except AssertionError:
                # TODO: maybe fail here?
                pass
        elif sgr_type == 39:
            change_payload = (TextStyleChange.FOREGROUND_COLOR, None)
        elif sgr_type >= 40 and sgr_type <= 47:
            change_payload = (
                TextStyleChange.BACKGROUND_COLOR,
                Color(
                    ColorType.NUMBERED_8,
                    BasicColor(sgr_type - 40)
                )
            )
        elif sgr_type == 48:
            try:
                change_payload = (
                    TextStyleChange.BACKGROUND_COLOR,
                    parse_extended_color(params_it)
                )
            except AssertionError:
                # TODO: maybe fail here?
                pass
        elif sgr_type == 49:
            change_payload = (TextStyleChange.BACKGROUND_COLOR, None)
        elif sgr_type >= 90 and sgr_type <= 97:
            change_payload = (
                TextStyleChange.FOREGROUND_COLOR,
                Color(
                    ColorType.NUMBERED_8_BRIGHT,
                    BasicColor(sgr_type - 90)
                )
            )
        elif sgr_type >= 100 and sgr_type <= 107:
            change_payload = (
                TextStyleChange.BACKGROUND_COLOR,
                Color(
                    ColorType.NUMBERED_8_BRIGHT,
                    BasicColor(sgr_type - 100)
                )
            )

        if change_payload != None:
            special_evs.append((EventType.TEXT_STYLE, change_payload))

        if single_param_with_args:
            break

def parse_csi_sequence(it, special_evs):
    """
    Parses control sequences which begin with a
    Control Sequence Introducer (CSI) as specified
    in ECMA-48, section 5.4.
    Supported escape sequences append events to
    special_evs while unsupported ones are ignored,
    and thus filtered out.
    """
    params = it.takewhile_greedy(csi_parameter_byte)
    inters = it.takewhile_greedy(csi_intermediate_byte)
    final = it.next()

    assert csi_final_byte(final)

    # Unsupported CSI sequences are ignored which reduces
    # the noise from unsupported sequences
    if final == 'm':
        parse_sgr_sequence(params, special_evs)

class Parser(object):
    """
    Parses a subset of special control sequences read from
    a pty device. It is somewhat high level: Given a decoded,
    proper Python string it will emit a series of events
    which just need to be reflected in the UI while any state
    is tracked in the Parser object.
    """
    def __init__(self):
        # unparsed output left from the last call to parse
        self.__leftover = ''

    def parse(self, input):
        """
        Main interface of Parser. Given a proper decoded
        Python string , it yields a series of tuples of the
        form (EventType, payload) which the caller can
        iterate through. Valid events are:

        * EventType.TEXT has a string slice as its payload
          which should be appended to the terminal buffer as is.

        * EventType.BELL has no payload and indicates that
          the bell character '\a' was in the terminal input.
          This usually should trigger the machine to beep
          and/or the window to set the urgent flag.

        Parsed control sequences are guaranteed to never
        appear in a TEXT event. This is also true for
        escape sequences which don't cause an event to
        be generated. This is true for all CSI escape
        sequences at the moment which are filtered out
        from saneterm's output in this way.
        """

        it = PositionedIterator(self.__leftover + input)
        self.__leftover = ''

        # keep track of the start position of the slice
        # we want to emit as a TEXT event
        start = 0

        # this is set by the parser before backtracking if
        # an ANSI escape sequence should be ignored, e. g.
        # if we don't support it
        ignore_esc = False

        # we expect a decoded string as input,
        # so we don't need to handle incremental
        # decoding here as well
        for code in it:
            # if flush_until is set, a slice of the buffer
            # from start to flush_until will be emitted as
            # a TEXT event
            flush_until = None
            # if not empty, each of its elements will be yield
            # one by one, but only after any necessary flushing
            special_evs = []

            # control characters flush before advancing pos
            # in order to not add them to the buffer -- we
            # want to handle them ourselves instead of
            # relying of gtk's default behavior.
            if code == '\a':
                flush_until = it.pos
                special_evs.append((EventType.BELL, None))
            elif code == '\033':
                # ignore_esc can be set if we encounter a '\033'
                # which is followed by a sequence we don't understand.
                # In that case we'll jump back to the '\033', but just
                # treat it as if it was an ordinary character.
                if ignore_esc:
                    ignore_esc = False
                else:
                    flush_until = it.pos

                    # if parsing fails we'll return to this point
                    it.waypoint()

                    try:
                        if it.next() == '[':
                            parse_csi_sequence(it, special_evs)
                        else:
                            # we only parse CSI sequences for now, all other
                            # sequences will be rendered as text to the terminal.
                            # This probably should change in the future since
                            # we also want to filter out, e. g. OSC sequences
                            ignore_esc = True

                    except AssertionError:
                        # AssertionError indicates a parse error, we'll render
                        # a escape sequence we can't parse verbatim for now
                        ignore_esc = True

                    except StopIteration:
                        # the full escape sequence wasn't contained in
                        # this chunk of input, so we'll parse it next time.
                        # Since we flush up to the escape sequence, we know
                        # where it started. The parser loop will exit at the
                        # end of this iteration because the iterator is
                        # exhausted.
                        self.__leftover = it.wrapped[flush_until:]

                        # prevent a backtrack which would break
                        # (this can't happen in the current code, but is
                        # a subtle problem in practise, so this line could
                        # save us some debugging later)
                        ignore_esc = False

                    # if we want to add the (invalid) escape sequence to the
                    # TermView verbatim, we'll need to backtrack as well as well
                    if ignore_esc:
                        it.backtrack()


            # at the end of input, flush if we aren't already
            if flush_until == None and it.empty():
                flush_until = it.pos + 1

            # only generate text event if it is non empty, …
            if flush_until != None and flush_until > start:
                yield (EventType.TEXT, it.wrapped[start:flush_until])

            # … but advance as if we had flushed
            if flush_until != None:
                start = it.pos + 1

            if len(special_evs) > 0:
                for ev in special_evs:
                    yield ev