File: test_stream.py

package info (click to toggle)
aioconsole 0.8.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 332 kB
  • sloc: python: 2,022; makefile: 6
file content (242 lines) | stat: -rw-r--r-- 7,279 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import os
import io
import gc
import sys
import pytest
import asyncio
from unittest.mock import Mock

from aioconsole.stream import create_standard_streams, ainput, aprint
from aioconsole.stream import is_pipe_transport_compatible, get_standard_streams


@pytest.mark.skipif(sys.platform == "win32", reason="Not supported on windows")
@pytest.mark.asyncio
async def test_create_standard_stream_with_pipe(is_uvloop):
    r1, w1 = os.pipe()
    r2, w2 = os.pipe()
    stdin = open(r1)
    stdout = open(w1, "w")
    stderr = open(w2, "w")

    assert is_pipe_transport_compatible(stdin)
    assert is_pipe_transport_compatible(stdout)
    assert is_pipe_transport_compatible(stderr)

    reader, writer1, writer2 = await create_standard_streams(stdin, stdout, stderr)

    writer1.write("a\n")
    await writer1.drain()
    data = await reader.readline()
    assert data == b"a\n"

    writer2.write("b\n")
    await writer2.drain()
    assert os.read(r2, 2) == b"b\n"

    # Mock stdout.close() to check if it has been called
    stdout_actual_close = stdout.close
    stdout.close = Mock()

    # Close the transport and delete the object
    writer1.transport.close()
    del reader, writer1, writer2
    gc.collect()  # Force garbage collection - necessary for pypy

    # Check that the transport has been closed but not stdout
    assert not stdout.close.called

    # Weirdly enough, uvloop DID close the file descriptor.
    # Probably because it used the file descriptor directly instead
    # of the pipe object. However, this should not be an issue since
    # file descriptors 0, 1, 2 do not seem to be affected by this.
    if not is_uvloop:
        stdout_actual_close()


@pytest.mark.asyncio
async def test_create_standard_stream_with_non_pipe(monkeypatch):
    stdin = io.StringIO("a\nb\nc\nd\n")
    stdout = io.StringIO()
    stderr = io.StringIO()
    reader, writer1, writer2 = await create_standard_streams(stdin, stdout, stderr)

    writer1.write("a\n")
    await writer1.drain()
    data = await reader.readline()
    assert data == b"a\n"
    assert stdout.getvalue() == "a\n"

    writer2.write("b\n")
    await writer2.drain()
    data = await reader.readline()
    assert data == b"b\n"
    assert stderr.getvalue() == "b\n"

    # Multiple writes
    writer2.write("c\n")
    writer2.write("d\n")
    await asyncio.sleep(0.1)
    writer2.write("e\n")
    writer2.close()
    assert writer2.is_closing()
    await writer2.wait_closed()
    assert stderr.getvalue() == "b\nc\nd\ne\n"
    with pytest.raises(RuntimeError):
        writer2.write("f\n")

    data = await reader.read(2)
    assert data == b"c\n"

    assert reader.at_eof() is False

    async for data in reader:
        assert data == b"d\n"

    assert reader.at_eof() is True

    # Check exception handling in the daemon thread

    class KeyboardInterruptLike(BaseException):
        pass

    def raise_keyboard_interrupt(*args):
        raise KeyboardInterruptLike

    def raise_os_error(*args):
        raise OSError

    monkeypatch.setattr(stdin, "readline", raise_os_error)
    with pytest.raises(OSError):
        data = await reader.readline()

    monkeypatch.setattr(stdin, "read", raise_keyboard_interrupt)
    with pytest.raises(KeyboardInterruptLike):
        data = await reader.read()


def mock_stdio(monkeypatch, input_text="", disable_stdin=False):
    monkeypatch.setattr("sys.stdin", None if disable_stdin else io.StringIO(input_text))
    monkeypatch.setattr("sys.stdout", io.StringIO())
    monkeypatch.setattr("sys.stderr", io.StringIO())


@pytest.mark.asyncio
async def test_ainput_with_standard_stream(monkeypatch):
    mock_stdio(monkeypatch, "a\nb\n")
    assert (await ainput()) == "a"
    assert (await ainput(">>> ")) == "b"
    assert sys.stdout.getvalue() == ">>> "
    assert sys.stderr.getvalue() == ""


@pytest.mark.asyncio
async def test_aprint_with_standard_stream(monkeypatch):
    mock_stdio(monkeypatch)
    await aprint("ab", "cd")
    assert sys.stdout.getvalue() == "ab cd\n"
    await aprint("a" * 1024 * 64)
    assert sys.stdout.getvalue() == "ab cd\n" + "a" * 1024 * 64 + "\n"
    assert sys.stderr.getvalue() == ""


@pytest.mark.parametrize("flush", [False, True])
@pytest.mark.asyncio
async def test_aprint_flush_argument(monkeypatch, flush):
    mock_stdio(monkeypatch)
    await aprint("a", flush=flush)
    if not flush:
        # Might or might not be there yet, depending on internal logic
        assert sys.stdout.getvalue() in ("", "a\n")
        await aprint("", end="", flush=True)
    assert sys.stdout.getvalue() == "a\n"


@pytest.mark.asyncio
async def test_read_from_closed_pipe(is_uvloop):
    if is_uvloop:
        pytest.skip("This test is flaky with uvloop for some reason.")

    stdin_r, stdin_w = os.pipe()
    stdout_r, stdout_w = os.pipe()
    stderr_r, stderr_w = os.pipe()

    stdin = open(stdin_w, "wb")
    stdin.write(b"hello\n")
    stdin.close()

    f_stdin = open(stdin_r, "r")
    f_stdout = open(stdout_w, "w")
    f_stderr = open(stderr_w, "w")

    reader, writer1, writer2 = await create_standard_streams(
        f_stdin, f_stdout, f_stderr
    )

    result = await ainput(">>> ", streams=(reader, writer1))
    assert result == "hello"

    writer1.close()
    await writer1.wait_closed()
    f_stdout.close()

    writer2.close()
    await writer2.wait_closed()
    f_stderr.close()

    assert open(stdout_r).read() == ">>> "
    assert open(stderr_r).read() == ""


@pytest.mark.skipif(sys.platform == "win32", reason="Not supported on windows")
@pytest.mark.asyncio
async def test_standard_stream_pipe_buffering(is_uvloop):
    if is_uvloop:
        pytest.skip("This test is flaky with uvloop for some reason.")
    r1, w1 = os.pipe()
    r2, w2 = os.pipe()
    stdin = open(r1)
    stdout = open(w1, "w")
    stderr = open(w2, "w")

    assert is_pipe_transport_compatible(stdin)
    assert is_pipe_transport_compatible(stdout)
    assert is_pipe_transport_compatible(stderr)

    reader, writer1, writer2 = await create_standard_streams(stdin, stdout, stderr)

    blob_size = 4 * 1024 * 1024  # 4 MB
    writer1.write("a\n" + "b" * blob_size + "\n")
    task = asyncio.ensure_future(writer1.drain())
    data = await reader.readline()
    assert data == b"a\n"

    # Check back pressure
    await asyncio.sleep(0.1)
    assert not task.done()
    assert len(reader._buffer) < blob_size

    data = await reader.readline()
    assert data == b"b" * blob_size + b"\n"
    await task


@pytest.mark.asyncio
async def test_aprint_with_no_stdin(monkeypatch):
    mock_stdio(monkeypatch, disable_stdin=True)
    await aprint("test1")
    assert sys.stdout.getvalue() == "test1\n"
    assert sys.stderr.getvalue() == ""
    with pytest.raises(RuntimeError) as ctx:
        await ainput("test2")
    assert str(ctx.value) == "ainput(): lost sys.stdin"
    assert sys.stdout.getvalue() == "test1\ntest2"
    assert sys.stderr.getvalue() == ""
    # Test the methods specifically
    reader, _ = await get_standard_streams()
    with pytest.raises(RuntimeError) as ctx:
        await reader.read(10)
    assert str(ctx.value) == "ainput(): lost sys.stdin"
    with pytest.raises(RuntimeError) as ctx:
        await reader.readline()
    assert str(ctx.value) == "ainput(): lost sys.stdin"