File: test_device_attribute.py

package info (click to toggle)
python-blessed 1.25-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,812 kB
  • sloc: python: 14,645; makefile: 13; sh: 7
file content (368 lines) | stat: -rw-r--r-- 12,946 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
"""Tests for DeviceAttribute class and Terminal.get_device_attributes().

DA1 (Primary Device Attributes) Response Format
================================================

Terminal response: ESC [ ? {service_class} ; {ext1} ; {ext2} ; ... c

Service Class (Terminal Type):
  1  = VT101
  62 = VT220
  63 = VT320
  64 = VT420
  65 = VT500-series

Extension Codes (Capability Flags):
  1  = 132-columns
  2  = Printer
  3  = ReGIS graphics
  4  = Sixel graphics
  6  = Selective erase
  7  = Soft character set (DRCS)
  8  = User-defined keys
  9  = National Replacement Character sets
  15 = Technical characters
  16 = Locator port
  17 = Terminal state interrogation
  18 = User windows
  21 = Horizontal scrolling
  22 = Color (ANSI color support)

Example: '\x1b[?64;1;2;4c' = VT420 with 132-col, Printer, and Sixel support
"""
# std
import time
import io

# 3rd party
import pytest

# local
from .conftest import TEST_KEYBOARD, IS_WINDOWS
from .accessories import (
    TestTerminal,
    pty_test,
    as_subprocess,
)
from blessed.keyboard import DeviceAttribute

pytestmark = pytest.mark.skipif(
    not TEST_KEYBOARD or IS_WINDOWS,
    reason="Timing-sensitive tests please do not run on build farms.")


@pytest.mark.parametrize("response,service_class,extensions,supports_sixel", [
    ('\x1b[?64;1;2;4;7c', 64, {1, 2, 4, 7}, True),
    ('\x1b[?64;1;2c', 64, {1, 2}, False),
    ('\x1b[?1c', 1, set(), False),
    ('\x1b[?62;1;4;6c', 62, {1, 4, 6}, True),
])
def test_device_attribute_from_match(response, service_class, extensions, supports_sixel):
    """Test DeviceAttribute.from_match() with various response formats."""
    match = DeviceAttribute.RE_RESPONSE.match(response)
    da = DeviceAttribute.from_match(match)
    assert da is not None
    assert da.service_class == service_class
    assert da.extensions == extensions
    assert da.supports_sixel is supports_sixel
    assert da.raw == response


@pytest.mark.parametrize("invalid_input", ['invalid', ''])
def test_device_attribute_from_match_invalid(invalid_input):
    """Test DeviceAttribute.from_match() with invalid input."""
    match = DeviceAttribute.RE_RESPONSE.match(invalid_input)
    assert match is None


def test_device_attribute_repr():
    """Test DeviceAttribute.__repr__()."""
    # DA1 response: VT420 (64) with Sixel (4) extension only
    da = DeviceAttribute('\x1b[?64;4c', 64, [4])
    repr_str = repr(da)
    assert 'DeviceAttribute' in repr_str
    assert 'service_class=64' in repr_str
    assert 'supports_sixel=True' in repr_str


def test_get_device_attributes_via_ungetch():
    """Test get_device_attributes() with response via ungetch."""
    def child(term):
        # DA1 response: VT420 (64) with 132-col (1), Printer (2), Sixel (4)
        term.ungetch('\x1b[?64;1;2;4c')
        da = term.get_device_attributes(timeout=0.01)
        assert da is not None
        assert da.service_class == 64  # VT420
        assert da.supports_sixel is True
        assert 4 in da.extensions
        return b'OK'

    output = pty_test(child, parent_func=None, test_name='test_get_device_attributes_via_ungetch')
    assert output == '\x1b[cOK'


def test_get_device_attributes_timeout():
    """Test get_device_attributes() timeout without response."""
    def child(term):
        stime = time.time()
        da = term.get_device_attributes(timeout=0.1)
        elapsed = time.time() - stime
        assert da is None
        assert 0.08 <= elapsed <= 0.15
        return b'TIMEOUT'

    output = pty_test(child, parent_func=None, test_name='test_get_device_attributes_timeout')
    assert output == '\x1b[cTIMEOUT'


def test_get_device_attributes_force_bypass_cache():
    """Test get_device_attributes() with force=True bypasses cache."""
    def child(term):
        # DA1 response 1: VT420 (64) with 132-col (1)
        term.ungetch('\x1b[?64;1c')
        da1 = term.get_device_attributes(timeout=0.01)

        # DA1 response 2: VT500-series (65) with Printer (2)
        term.ungetch('\x1b[?65;2c')
        da2 = term.get_device_attributes(timeout=0.01, force=True)

        assert da1 is not None
        assert da2 is not None
        assert da1.service_class == 64  # VT420
        assert da2.service_class == 65  # VT500-series
        assert da1 is not da2

        return b'FORCED'

    output = pty_test(child, parent_func=None,
                      test_name='test_get_device_attributes_force_bypass_cache')
    assert output == '\x1b[c\x1b[cFORCED'


def test_get_device_attributes_no_force_uses_cache():
    """Test get_device_attributes() without force uses cached result."""
    def child(term):
        # DA1 response 1: VT420 (64) with 132-col (1)
        term.ungetch('\x1b[?64;1c')
        da1 = term.get_device_attributes(timeout=0.01)

        # Second query without force should use cache even with different ungetch data
        # DA1 response 2: VT500-series (65) with Printer (2) - but this is ignored due to cache
        term.ungetch('\x1b[?65;2c')
        da2 = term.get_device_attributes(timeout=0.01, force=False)

        assert da1 is not None
        assert da2 is not None
        assert da1 is da2
        assert da1.service_class == 64  # VT420 (cached)
        assert da2.service_class == 64  # VT420 (cached)

        return b'NO_FORCE'

    output = pty_test(child, parent_func=None,
                      test_name='test_get_device_attributes_no_force_uses_cache')
    assert output == '\x1b[cNO_FORCE'


def test_get_device_attributes_retry_after_failure():
    """Test get_device_attributes() can retry after failed query with force=True."""
    def child(term):
        # First query fails (timeout)
        da1 = term.get_device_attributes(timeout=0.01)

        # Second query succeeds with force=True: VT420 (64) with Sixel (4)
        term.ungetch('\x1b[?64;4c')
        da2 = term.get_device_attributes(timeout=0.01, force=True)

        assert da1 is None
        assert da2 is not None
        assert da2.service_class == 64  # VT420
        assert da2.supports_sixel is True

        return b'RETRY'

    output = pty_test(child, parent_func=None,
                      test_name='test_get_device_attributes_retry_after_failure')
    assert output == '\x1b[c\x1b[cRETRY'


def test_get_device_attributes_sticky_failure():
    """Test get_device_attributes() sticky failure prevents repeated queries."""
    def child(term):
        # First query fails (timeout)
        da1 = term.get_device_attributes(timeout=0.01)

        # Second query should return None immediately due to sticky failure
        term.ungetch('\x1b[?64;4c')
        da2 = term.get_device_attributes(timeout=0.01)

        assert da1 is None
        assert da2 is None

        return b'STICKY'

    output = pty_test(child, parent_func=None,
                      test_name='test_get_device_attributes_sticky_failure')
    assert output == '\x1b[cSTICKY'


def test_get_device_attributes_multiple_extensions():
    """Test get_device_attributes() with many extensions."""
    def child(term):
        # DA1 response: VT420 (64) with extensions:
        # 132-col (1), Printer (2), Sixel (4), Selective erase (6), DRCS (7),
        # National Replacement Character sets (9), Technical characters (15),
        # User windows (18), Horizontal scrolling (21), Color (22)
        term.ungetch('\x1b[?64;1;2;4;6;7;9;15;18;21;22c')
        da = term.get_device_attributes(timeout=0.01)
        assert da is not None
        assert da.service_class == 64  # VT420
        assert da.extensions == {1, 2, 4, 6, 7, 9, 15, 18, 21, 22}
        assert da.supports_sixel is True
        return b'MULTI'

    output = pty_test(child, parent_func=None,
                      test_name='test_get_device_attributes_multiple_extensions')
    assert output == '\x1b[cMULTI'


def test_device_attribute_init_with_none_extensions():
    """Test DeviceAttribute.__init__() with None extensions."""
    # DA1 response: VT101 (1) with no extensions
    da = DeviceAttribute('\x1b[?1c', 1, None)
    assert da.service_class == 1  # VT101
    assert da.extensions == set()
    assert da.supports_sixel is False


def test_device_attribute_init_with_list_extensions():
    """Test DeviceAttribute.__init__() with list of extensions."""
    # DA1 response: VT420 (64) with Sixel (4) extension only
    da = DeviceAttribute('\x1b[?64;4c', 64, [4])
    assert da.service_class == 64  # VT420
    assert da.extensions == {4}
    assert da.supports_sixel is True


def test_device_attribute_raw_stored():
    """Test DeviceAttribute stores raw response string."""
    raw = '\x1b[?64;1;2;4c'
    match = DeviceAttribute.RE_RESPONSE.match(raw)
    da = DeviceAttribute.from_match(match)
    assert da is not None
    assert da.raw == raw


def test_get_kitty_keyboard_state_boundary_neither_response():
    """Test boundary detection when neither Kitty nor DA1 response matches."""
    @as_subprocess
    def child():
        stream = io.StringIO()
        term = TestTerminal(stream=stream, force_styling=True)
        term._is_a_tty = True

        term.ungetch('garbage_response')
        flags = term.get_kitty_keyboard_state(timeout=0.01)
        assert flags is None
        assert term._kitty_kb_first_query_attempted is True
        assert term._kitty_kb_first_query_failed is True

        flags2 = term.get_kitty_keyboard_state(timeout=1.0)
        assert flags2 is None
    child()


def test_get_kitty_keyboard_state_boundary_da1_only():
    """Test boundary detection when only DA1 responds."""
    @as_subprocess
    def child():
        stream = io.StringIO()
        term = TestTerminal(stream=stream, force_styling=True)
        term._is_a_tty = True

        # DA1 response: VT420 (64) with 132-col (1), Printer (2) - no Kitty protocol
        term.ungetch('\x1b[?64;1;2c')
        flags = term.get_kitty_keyboard_state(timeout=0.01)
        assert flags is None
        assert term._kitty_kb_first_query_attempted is True
        assert term._kitty_kb_first_query_failed is True

        flags2 = term.get_kitty_keyboard_state(timeout=1.0)
        assert flags2 is None
    child()


def test_enable_kitty_keyboard_after_query_failed():
    """Test enable_kitty_keyboard yields without emitting sequences after query failed."""
    @as_subprocess
    def child():
        stream = io.StringIO()
        term = TestTerminal(stream=stream, force_styling=True)
        term._is_a_tty = True

        term._kitty_kb_first_query_failed = True

        with term.enable_kitty_keyboard(disambiguate=True, timeout=0.01, force=False):
            pass

        assert stream.getvalue() == ''
    child()


def test_device_attribute_from_match_with_malformed_extensions():
    """Test DeviceAttribute.from_match() with malformed extension strings."""
    # Test with non-digit extension parts (should be filtered out)
    match = DeviceAttribute.RE_RESPONSE.match('\x1b[?64;abc;4;xyz;7c')
    if match:
        # Manually test parsing logic
        da = DeviceAttribute.from_match(match)
        # Should only include valid numeric extensions
        assert da.service_class == 64
        assert 4 in da.extensions
        assert 7 in da.extensions


def test_device_attribute_from_match_with_whitespace_extensions():
    """Test DeviceAttribute.from_match() with whitespace in extensions."""
    # Create a match with extensions that have whitespace
    # This tests the part.strip() and part.isdigit() checks

    # Since the regex won't match whitespace, let's test the code path
    # by using extensions_str that could have spaces
    # Actually, we need to manually construct to test lines 2095-2097
    # The regex pattern won't capture whitespace, so this branch may be defensive
    # Let's test with empty extension parts
    match = DeviceAttribute.RE_RESPONSE.match('\x1b[?64;;4;;c')
    if match:
        da = DeviceAttribute.from_match(match)
        assert da.service_class == 64
        # Empty parts should be filtered out
        assert da.extensions == {4}


def test_kitty_keyboard_protocol_eq_with_int():
    """Test KittyKeyboardProtocol.__eq__() with int."""
    from blessed.keyboard import KittyKeyboardProtocol
    proto = KittyKeyboardProtocol(15)
    assert proto == 15
    assert proto != 20


def test_kitty_keyboard_protocol_eq_with_protocol():
    """Test KittyKeyboardProtocol.__eq__() with another KittyKeyboardProtocol."""
    from blessed.keyboard import KittyKeyboardProtocol
    proto1 = KittyKeyboardProtocol(15)
    proto2 = KittyKeyboardProtocol(15)
    proto3 = KittyKeyboardProtocol(20)
    assert proto1 == proto2
    assert proto1 != proto3


def test_kitty_keyboard_protocol_eq_with_other_types():
    """Test KittyKeyboardProtocol.__eq__() with non-int, non-KittyKeyboardProtocol types."""
    from blessed.keyboard import KittyKeyboardProtocol
    proto = KittyKeyboardProtocol(15)
    assert proto != "15"
    assert proto != [15]
    assert proto is not None
    assert proto != {"value": 15}