File: test_main_loop.py

package info (click to toggle)
urwid 2.6.16-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,236 kB
  • sloc: python: 29,290; javascript: 382; sh: 34; makefile: 22
file content (117 lines) | stat: -rw-r--r-- 3,860 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from __future__ import annotations

import concurrent.futures
import os
import socket
import sys
import threading
import typing
import unittest.mock

import urwid

if typing.TYPE_CHECKING:
    from types import TracebackType

IS_WINDOWS = sys.platform == "win32"


class ClosingTemporaryFilesPair(typing.ContextManager[typing.Tuple[typing.TextIO, typing.TextIO]]):
    """File pair context manager that closes temporary files on exit.

    Since `sys.stdout` is TextIO, tests have to use compatible api for the proper behavior imitation.
    """

    __slots__ = ("rd_s", "wr_s", "rd_f", "wr_f")

    def __init__(self) -> None:
        self.rd_s: socket.socket | None = None
        self.wr_s: socket.socket | None = None
        self.rd_f: typing.TextIO | None = None
        self.wr_f: typing.TextIO | None = None

    def __enter__(self) -> tuple[typing.TextIO, typing.TextIO]:
        self.rd_s, self.wr_s = socket.socketpair()
        self.rd_f = os.fdopen(self.rd_s.fileno(), "r", encoding="utf-8", closefd=False)
        self.wr_f = os.fdopen(self.wr_s.fileno(), "w", encoding="utf-8", closefd=False)
        return self.rd_f, self.wr_f

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Close everything explicit without waiting for garbage collected."""
        if self.rd_f is not None and not self.rd_f.closed:
            self.rd_f.close()
        if self.rd_s is not None:
            self.rd_s.close()
        if self.wr_f is not None and not self.wr_f.closed:
            self.wr_f.close()
        if self.wr_s is not None:
            self.wr_s.close()


def stop_screen_cb(*_args, **_kwargs) -> typing.NoReturn:
    raise urwid.ExitMainLoop


class TestMainLoop(unittest.TestCase):
    @unittest.skipIf(IS_WINDOWS, "selectors for pipe are not supported on Windows")
    def test_watch_pipe(self):
        """Test watching pipe is stopped on explicit False only."""
        evt = threading.Event()  # We need thread synchronization
        outcome: list[bytes] = []

        def pipe_cb(data: bytes) -> typing.Any:
            outcome.append(data)

            if not evt.is_set():
                evt.set()

            if data == b"false":
                return False
            if data == b"true":
                return True
            if data == b"null":
                return None
            return object()

        def pipe_writer(fd: int) -> None:
            os.write(fd, b"something")
            if evt.wait(0.1):
                evt.clear()
                os.write(fd, b"true")
            if evt.wait(0.1):
                evt.clear()
                os.write(fd, b"null")
            if evt.wait(0.1):
                evt.clear()
                os.write(fd, b"false")

        with ClosingTemporaryFilesPair() as (
            rd_r,
            wr_r,
        ), ClosingTemporaryFilesPair() as (
            rd_w,
            wr_w,
        ), concurrent.futures.ThreadPoolExecutor(
            max_workers=1,
        ) as executor, unittest.mock.patch(
            "subprocess.Popen",  # we want to be sure that nothing outside is called
            autospec=True,
        ):
            evl = urwid.MainLoop(
                urwid.SolidFill(),
                screen=urwid.display.raw.Screen(input=rd_r, output=wr_w),  # We need screen which support mocked IO
                handle_mouse=False,  # Less external calls - better
            )
            evl.set_alarm_in(1, stop_screen_cb)
            pipe_fd = evl.watch_pipe(pipe_cb)
            executor.submit(pipe_writer, pipe_fd)

            evl.run()
            self.assertEqual([b"something", b"true", b"null", b"false"], outcome)
            not_removed = evl.remove_watch_pipe(pipe_fd)
            self.assertFalse(not_removed)