1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
|
import os
import io
import gc
import sys
import pytest
import asyncio
from unittest.mock import Mock
from aioconsole.stream import create_standard_streams, ainput, aprint
from aioconsole.stream import is_pipe_transport_compatible, get_standard_streams
@pytest.mark.skipif(sys.platform == "win32", reason="Not supported on windows")
@pytest.mark.asyncio
async def test_create_standard_stream_with_pipe(is_uvloop):
r1, w1 = os.pipe()
r2, w2 = os.pipe()
stdin = open(r1)
stdout = open(w1, "w")
stderr = open(w2, "w")
assert is_pipe_transport_compatible(stdin)
assert is_pipe_transport_compatible(stdout)
assert is_pipe_transport_compatible(stderr)
reader, writer1, writer2 = await create_standard_streams(stdin, stdout, stderr)
writer1.write("a\n")
await writer1.drain()
data = await reader.readline()
assert data == b"a\n"
writer2.write("b\n")
await writer2.drain()
assert os.read(r2, 2) == b"b\n"
# Mock stdout.close() to check if it has been called
stdout_actual_close = stdout.close
stdout.close = Mock()
# Close the transport and delete the object
writer1.transport.close()
del reader, writer1, writer2
gc.collect() # Force garbage collection - necessary for pypy
# Check that the transport has been closed but not stdout
assert not stdout.close.called
# Weirdly enough, uvloop DID close the file descriptor.
# Probably because it used the file descriptor directly instead
# of the pipe object. However, this should not be an issue since
# file descriptors 0, 1, 2 do not seem to be affected by this.
if not is_uvloop:
stdout_actual_close()
@pytest.mark.asyncio
async def test_create_standard_stream_with_non_pipe(monkeypatch):
stdin = io.StringIO("a\nb\nc\nd\n")
stdout = io.StringIO()
stderr = io.StringIO()
reader, writer1, writer2 = await create_standard_streams(stdin, stdout, stderr)
writer1.write("a\n")
await writer1.drain()
data = await reader.readline()
assert data == b"a\n"
assert stdout.getvalue() == "a\n"
writer2.write("b\n")
await writer2.drain()
data = await reader.readline()
assert data == b"b\n"
assert stderr.getvalue() == "b\n"
# Multiple writes
writer2.write("c\n")
writer2.write("d\n")
await asyncio.sleep(0.1)
writer2.write("e\n")
writer2.close()
assert writer2.is_closing()
await writer2.wait_closed()
assert stderr.getvalue() == "b\nc\nd\ne\n"
with pytest.raises(RuntimeError):
writer2.write("f\n")
data = await reader.read(2)
assert data == b"c\n"
assert reader.at_eof() is False
async for data in reader:
assert data == b"d\n"
assert reader.at_eof() is True
# Check exception handling in the daemon thread
class KeyboardInterruptLike(BaseException):
pass
def raise_keyboard_interrupt(*args):
raise KeyboardInterruptLike
def raise_os_error(*args):
raise OSError
monkeypatch.setattr(stdin, "readline", raise_os_error)
with pytest.raises(OSError):
data = await reader.readline()
monkeypatch.setattr(stdin, "read", raise_keyboard_interrupt)
with pytest.raises(KeyboardInterruptLike):
data = await reader.read()
def mock_stdio(monkeypatch, input_text="", disable_stdin=False):
monkeypatch.setattr("sys.stdin", None if disable_stdin else io.StringIO(input_text))
monkeypatch.setattr("sys.stdout", io.StringIO())
monkeypatch.setattr("sys.stderr", io.StringIO())
@pytest.mark.asyncio
async def test_ainput_with_standard_stream(monkeypatch):
mock_stdio(monkeypatch, "a\nb\n")
assert (await ainput()) == "a"
assert (await ainput(">>> ")) == "b"
assert sys.stdout.getvalue() == ">>> "
assert sys.stderr.getvalue() == ""
@pytest.mark.asyncio
async def test_aprint_with_standard_stream(monkeypatch):
mock_stdio(monkeypatch)
await aprint("ab", "cd")
assert sys.stdout.getvalue() == "ab cd\n"
await aprint("a" * 1024 * 64)
assert sys.stdout.getvalue() == "ab cd\n" + "a" * 1024 * 64 + "\n"
assert sys.stderr.getvalue() == ""
@pytest.mark.parametrize("flush", [False, True])
@pytest.mark.asyncio
async def test_aprint_flush_argument(monkeypatch, flush):
mock_stdio(monkeypatch)
await aprint("a", flush=flush)
if not flush:
# Might or might not be there yet, depending on internal logic
assert sys.stdout.getvalue() in ("", "a\n")
await aprint("", end="", flush=True)
assert sys.stdout.getvalue() == "a\n"
@pytest.mark.asyncio
async def test_read_from_closed_pipe(is_uvloop):
if is_uvloop:
pytest.skip("This test is flaky with uvloop for some reason.")
stdin_r, stdin_w = os.pipe()
stdout_r, stdout_w = os.pipe()
stderr_r, stderr_w = os.pipe()
stdin = open(stdin_w, "wb")
stdin.write(b"hello\n")
stdin.close()
f_stdin = open(stdin_r, "r")
f_stdout = open(stdout_w, "w")
f_stderr = open(stderr_w, "w")
reader, writer1, writer2 = await create_standard_streams(
f_stdin, f_stdout, f_stderr
)
result = await ainput(">>> ", streams=(reader, writer1))
assert result == "hello"
writer1.close()
await writer1.wait_closed()
f_stdout.close()
writer2.close()
await writer2.wait_closed()
f_stderr.close()
assert open(stdout_r).read() == ">>> "
assert open(stderr_r).read() == ""
@pytest.mark.skipif(sys.platform == "win32", reason="Not supported on windows")
@pytest.mark.asyncio
async def test_standard_stream_pipe_buffering(is_uvloop):
if is_uvloop:
pytest.skip("This test is flaky with uvloop for some reason.")
r1, w1 = os.pipe()
r2, w2 = os.pipe()
stdin = open(r1)
stdout = open(w1, "w")
stderr = open(w2, "w")
assert is_pipe_transport_compatible(stdin)
assert is_pipe_transport_compatible(stdout)
assert is_pipe_transport_compatible(stderr)
reader, writer1, writer2 = await create_standard_streams(stdin, stdout, stderr)
blob_size = 4 * 1024 * 1024 # 4 MB
writer1.write("a\n" + "b" * blob_size + "\n")
task = asyncio.ensure_future(writer1.drain())
data = await reader.readline()
assert data == b"a\n"
# Check back pressure
await asyncio.sleep(0.1)
assert not task.done()
assert len(reader._buffer) < blob_size
data = await reader.readline()
assert data == b"b" * blob_size + b"\n"
await task
@pytest.mark.asyncio
async def test_aprint_with_no_stdin(monkeypatch):
mock_stdio(monkeypatch, disable_stdin=True)
await aprint("test1")
assert sys.stdout.getvalue() == "test1\n"
assert sys.stderr.getvalue() == ""
with pytest.raises(RuntimeError) as ctx:
await ainput("test2")
assert str(ctx.value) == "ainput(): lost sys.stdin"
assert sys.stdout.getvalue() == "test1\ntest2"
assert sys.stderr.getvalue() == ""
# Test the methods specifically
reader, _ = await get_standard_streams()
with pytest.raises(RuntimeError) as ctx:
await reader.read(10)
assert str(ctx.value) == "ainput(): lost sys.stdin"
with pytest.raises(RuntimeError) as ctx:
await reader.readline()
assert str(ctx.value) == "ainput(): lost sys.stdin"
|