File: escape-code-parser.go

package info (click to toggle)
kitty 0.42.1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 28,564 kB
  • sloc: ansic: 82,787; python: 55,191; objc: 5,122; sh: 1,295; xml: 364; makefile: 143; javascript: 78
file content (336 lines) | stat: -rw-r--r-- 7,597 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
// License: GPLv3 Copyright: 2022, Kovid Goyal, <kovid at kovidgoyal.net>

package wcswidth

import (
	"bytes"
	"fmt"

	"github.com/kovidgoyal/kitty/tools/utils"
)

var _ = fmt.Print

type parser_state uint8
type csi_state uint8
type csi_char_type uint8

var bracketed_paste_start = []byte{'2', '0', '0', '~'}

const (
	normal parser_state = iota
	esc
	csi
	st
	st_or_bel
	esc_st
	c1_st
	bracketed_paste
)

const (
	parameter csi_state = iota
	intermediate
)

const (
	unknown_csi_char csi_char_type = iota
	parameter_csi_char
	intermediate_csi_char
	final_csi_char
)

type EscapeCodeParser struct {
	state                  parser_state
	utf8_state, utf8_codep utils.UTF8State
	csi_state              csi_state
	current_buffer         []byte
	bracketed_paste_buffer []utils.UTF8State
	current_callback       func([]byte) error

	ReplaceInvalidUtf8Bytes bool

	// Callbacks
	HandleRune                func(rune) error
	HandleEndOfBracketedPaste func() error
	HandleCSI                 func([]byte) error
	HandleOSC                 func([]byte) error
	HandleDCS                 func([]byte) error
	HandlePM                  func([]byte) error
	HandleSOS                 func([]byte) error
	HandleAPC                 func([]byte) error
}

func (self *EscapeCodeParser) InBracketedPaste() bool { return self.state == bracketed_paste }

func (self *EscapeCodeParser) ParseString(s string) error {
	return self.Parse(utils.UnsafeStringToBytes(s))
}

func (self *EscapeCodeParser) ParseByte(b byte) error {
	switch self.state {
	case normal, bracketed_paste:
		prev_utf8_state := self.utf8_state
		switch utils.DecodeUtf8(&self.utf8_state, &self.utf8_codep, b) {
		case utils.UTF8_ACCEPT:
			err := self.dispatch_char(self.utf8_codep)
			if err != nil {
				self.reset_state()
				return err
			}
		case utils.UTF8_REJECT:
			self.utf8_state = utils.UTF8_ACCEPT
			if prev_utf8_state != utils.UTF8_ACCEPT {
				// reparse this byte with state set to UTF8_ACCEPT
				return self.ParseByte(b)
			}
			if self.ReplaceInvalidUtf8Bytes {
				err := self.dispatch_char(utils.UTF8State(0xfffd))
				if err != nil {
					return err
				}
			}
		}
	default:
		err := self.dispatch_byte(b)
		if err != nil {
			self.reset_state()
			return err
		}
	}
	return nil
}

func (self *EscapeCodeParser) Parse(data []byte) error {
	for _, b := range data {
		err := self.ParseByte(b)
		if err != nil {
			return err
		}
	}
	return nil
}

func (self *EscapeCodeParser) Reset() {
	self.reset_state()
}

func (self *EscapeCodeParser) write_ch(ch byte) {
	self.current_buffer = append(self.current_buffer, ch)
}

func csi_type(ch byte) csi_char_type {
	if (0x30 <= ch && ch <= 0x3f) || ch == '-' {
		return parameter_csi_char
	}
	if 0x40 <= ch && ch <= 0x7E {
		return final_csi_char
	}
	if 0x20 <= ch && ch <= 0x2F {
		return intermediate_csi_char
	}
	return unknown_csi_char
}

func (self *EscapeCodeParser) reset_state() {
	self.current_buffer = self.current_buffer[:0]
	self.bracketed_paste_buffer = self.bracketed_paste_buffer[:0]
	self.state = normal
	self.utf8_state = utils.UTF8_ACCEPT
	self.utf8_codep = utils.UTF8_ACCEPT
	self.current_callback = nil
	self.csi_state = parameter
}

func (self *EscapeCodeParser) dispatch_esc_code() error {
	if self.state == csi && bytes.Equal(self.current_buffer, bracketed_paste_start) {
		self.reset_state()
		self.state = bracketed_paste
		return nil
	}
	var err error
	if self.current_callback != nil {
		err = self.current_callback(self.current_buffer)
	}
	self.reset_state()
	return err
}

func (self *EscapeCodeParser) invalid_escape_code() {
	self.reset_state()
}

func (self *EscapeCodeParser) dispatch_rune(ch utils.UTF8State) error {
	if self.HandleRune != nil {
		return self.HandleRune(rune(ch))
	}
	return nil
}

func (self *EscapeCodeParser) bp_buffer_equals(chars []utils.UTF8State) bool {
	if len(self.bracketed_paste_buffer) != len(chars) {
		return false
	}
	for i, q := range chars {
		if self.bracketed_paste_buffer[i] != q {
			return false
		}
	}
	return true
}

func (self *EscapeCodeParser) dispatch_char(ch utils.UTF8State) error {
	if self.state == bracketed_paste {
		dispatch := func() error {
			if len(self.bracketed_paste_buffer) > 0 {
				for _, c := range self.bracketed_paste_buffer {
					err := self.dispatch_rune(c)
					if err != nil {
						return err
					}
				}
				self.bracketed_paste_buffer = self.bracketed_paste_buffer[:0]
			}
			return self.dispatch_rune(ch)
		}
		handle_ch := func(chars ...utils.UTF8State) error {
			if self.bp_buffer_equals(chars) {
				self.bracketed_paste_buffer = append(self.bracketed_paste_buffer, ch)
				if self.bracketed_paste_buffer[len(self.bracketed_paste_buffer)-1] == '~' {
					self.reset_state()
					if self.HandleEndOfBracketedPaste != nil {
						if err := self.HandleEndOfBracketedPaste(); err != nil {
							return err
						}
					}
				}
				return nil
			} else {
				return dispatch()
			}
		}
		switch ch {
		case 0x1b:
			return handle_ch()
		case '[':
			return handle_ch(0x1b)
		case '2':
			return handle_ch(0x1b, '[')
		case '0':
			return handle_ch(0x1b, '[', '2')
		case '1':
			return handle_ch(0x1b, '[', '2', '0')
		case '~':
			return handle_ch(0x1b, '[', '2', '0', '1')
		default:
			return dispatch()
		}
	} // end self.state == bracketed_paste

	switch ch {
	case 0x1b:
		self.state = esc
	case 0x90:
		self.state = st
		self.current_callback = self.HandleDCS
	case 0x9b:
		self.state = csi
		self.current_callback = self.HandleCSI
	case 0x9d:
		self.state = st_or_bel
		self.current_callback = self.HandleOSC
	case 0x98:
		self.state = st
		self.current_callback = self.HandleSOS
	case 0x9e:
		self.state = st
		self.current_callback = self.HandlePM
	case 0x9f:
		self.state = st
		self.current_callback = self.HandleAPC
	default:
		return self.dispatch_rune(ch)
	}
	return nil
}

func (self *EscapeCodeParser) dispatch_byte(ch byte) error {
	switch self.state {
	case esc:
		switch ch {
		case 'P':
			self.state = st
			self.current_callback = self.HandleDCS
		case '[':
			self.state = csi
			self.csi_state = parameter
			self.current_callback = self.HandleCSI
		case ']':
			self.state = st_or_bel
			self.current_callback = self.HandleOSC
		case '^':
			self.state = st
			self.current_callback = self.HandlePM
		case '_':
			self.state = st
			self.current_callback = self.HandleAPC
		case 'D', 'E', 'H', 'M', 'N', 'O', 'Z', '6', '7', '8', '9', '=', '>', 'F', 'c', 'l', 'm', 'n', 'o', '|', '}', '~':
		default:
			// we drop this dangling Esc and reparse the byte after the esc
			self.reset_state()
			return self.ParseByte(ch)
		}
	case csi:
		self.write_ch(ch)
		switch self.csi_state {
		case parameter:
			switch csi_type(ch) {
			case intermediate_csi_char:
				self.csi_state = intermediate
			case final_csi_char:
				return self.dispatch_esc_code()
			case unknown_csi_char:
				self.invalid_escape_code()
			}
		case intermediate:
			switch csi_type(ch) {
			case parameter_csi_char, unknown_csi_char:
				self.invalid_escape_code()
			case final_csi_char:
				return self.dispatch_esc_code()
			}
		}
	case st_or_bel:
		if ch == 0x7 {
			return self.dispatch_esc_code()
		}
		fallthrough
	case st:
		if ch == 0x1b {
			self.state = esc_st
		} else if ch == 0xc2 {
			self.state = c1_st
		} else {
			self.write_ch(ch)
		}
	case esc_st:
		if ch == '\\' {
			return self.dispatch_esc_code()
		} else {
			self.state = st
			self.write_ch(0x1b)
			if ch != 0x1b {
				self.write_ch(ch)
			}
		}
	case c1_st:
		if ch == 0x9c {
			return self.dispatch_esc_code()
		} else {
			self.state = st
			self.write_ch(0xc2)
			self.write_ch(ch)
		}
	}
	return nil
}