1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
|
import threading
from io import StringIO
import pytest
from briefcase.console import Console
from briefcase.integrations import subprocess
from briefcase.integrations.subprocess import PopenOutputStreamer
@pytest.fixture
def streamer(streaming_process):
return PopenOutputStreamer(
label="test",
popen_process=streaming_process,
console=Console(),
)
def test_output(streamer, capsys):
"""Process output is printed."""
streamer.start()
streamer.join(timeout=5)
# fmt: off
assert capsys.readouterr().out == (
"output line 1\n"
"\n"
"output line 3\n"
)
# fmt: on
def test_stdout_closes_unexpectedly(streamer, monkeypatch, capsys):
"""Streamer exits from ValueError because stdout was closed."""
def monkeypatch_ensure_str(value):
"""Close stdout when ensure_str() runs on output from readline()."""
streamer.popen_process.stdout.close()
return value
monkeypatch.setattr(subprocess, "ensure_str", monkeypatch_ensure_str)
streamer.popen_process.stdout = StringIO("output line 1\noutput line 2")
streamer.start()
streamer.join(timeout=5)
assert capsys.readouterr().out == (
"output line 1\n"
"WARNING: stdout was unexpectedly closed while streaming output\n"
)
def test_readline_raises_exception(streamer, monkeypatch, capsys):
"""Streamer aborts if readline() raises ValueError for reasons other than stdout
closing."""
def monkeypatch_ensure_str(value):
"""Simulate readline() raising an ValueError-derived exception."""
raise UnicodeError("readline() exception")
monkeypatch.setattr(subprocess, "ensure_str", monkeypatch_ensure_str)
streamer.start()
streamer.join(timeout=5)
assert capsys.readouterr().out == (
"Error while streaming output: UnicodeError: readline() exception\n"
)
def test_request_stop(streamer, capsys):
"""Requesting the streamer to stop sets the stop flag."""
streamer.start()
streamer.join(timeout=5)
# fmt: off
assert capsys.readouterr().out == (
"output line 1\n"
"\n"
"output line 3\n"
)
# fmt: on
assert not streamer.stop_flag.is_set()
streamer.request_stop()
assert streamer.stop_flag.is_set()
def test_request_stop_set_immediately(streamer, capsys):
"""Nothing is printed if stop flag is immediately set."""
streamer.request_stop()
streamer.start()
streamer.join(timeout=5)
assert capsys.readouterr().out == ""
def test_request_stop_set_during_output(streamer, monkeypatch, capsys):
"""Streamer prints nothing more after stop flag is set."""
def filter_func(value):
"""Simulate stop flag set while output is being read."""
streamer.request_stop()
yield value
streamer.filter_func = filter_func
streamer.start()
streamer.join(timeout=5)
assert capsys.readouterr().out == "output line 1\n"
def test_captured_output(streamer, capsys):
"""Captured output is available after streaming."""
streamer.capture_output = True
streamer.start()
streamer.join(timeout=5)
assert capsys.readouterr().out == ""
# fmt: off
assert streamer.captured_output == (
"output line 1\n"
"\n"
"output line 3\n"
)
# fmt: on
def test_captured_output_interim(streamer, monkeypatch, capsys):
"""Captured output is available during streaming."""
streamer.capture_output = True
streamer_is_waiting = threading.Event()
continue_streaming = threading.Event()
def mock_readline():
"""Wait on asserts while returning output lines."""
yield "output line 1\n"
yield "output line 2\n"
streamer_is_waiting.set()
continue_streaming.wait(timeout=5)
yield "output line 3\n"
yield "output line 4\n"
yield ""
streamer.popen_process.stdout.readline.side_effect = mock_readline()
streamer.start()
streamer_is_waiting.wait(timeout=5)
assert streamer.captured_output == "output line 1\noutput line 2\n"
continue_streaming.set()
streamer.join(timeout=5)
assert streamer.captured_output == "output line 3\noutput line 4\n"
def test_filter_func(streamer, capsys):
"""A filter can be added to modify an output stream."""
# Define a filter function that converts "output" into "filtered"
def filter_func(line):
yield line.replace("output", "filtered")
streamer.filter_func = filter_func
streamer.start()
streamer.join(timeout=5)
# fmt: off
# Output has been transformed
assert capsys.readouterr().out == (
"filtered line 1\n"
"\n"
"filtered line 3\n"
)
# fmt: on
def test_filter_func_reject(streamer, capsys):
"""A filter that rejects lines can be added to modify an output stream."""
# Define a filter function that ignores blank lines
def filter_func(line):
if len(line) == 0:
return
yield line
streamer.filter_func = filter_func
streamer.start()
streamer.join(timeout=5)
# fmt: off
# Output has been transformed
assert capsys.readouterr().out == (
"output line 1\n"
"output line 3\n"
)
# fmt: on
def test_filter_func_line_ends(streamer, sleep_zero, capsys):
"""Filter functions are not provided the newline."""
# Define a filter function that redacts lines that end with 1
# The newline is *not* included.
def filter_func(line):
if line.endswith("line 1"):
yield line.replace("line 1", "**REDACTED**")
else:
yield line
streamer.filter_func = filter_func
streamer.start()
streamer.join(timeout=5)
# fmt: off
# Output has been transformed; newline exists in output
assert capsys.readouterr().out == (
"output **REDACTED**\n"
"\n"
"output line 3\n"
)
# fmt: on
def test_filter_func_line_multiple_output(streamer, capsys):
"""Filter functions can generate multiple lines from a single input."""
# Define a filter function that adds an extra line of content when the
# lines that end with 1
def filter_func(line):
yield line
if line.endswith("line 1"):
yield "Extra content!"
streamer.filter_func = filter_func
streamer.start()
streamer.join(timeout=5)
# fmt: off
# Output has been transformed; newline exists in output
assert capsys.readouterr().out == (
"output line 1\n"
"Extra content!\n"
"\n"
"output line 3\n"
)
# fmt: on
def test_filter_func_stop_iteration(streamer, capsys):
"""A filter can indicate that logging should stop."""
# Define a filter function that converts "output" into "filtered",
# and terminates streaming when a blank line is seen.
def filter_func(line):
if line == "":
raise subprocess.StopStreaming()
yield line.replace("output", "filtered")
streamer.filter_func = filter_func
streamer.start()
streamer.join(timeout=5)
# fmt: off
# Output has been transformed, but is truncated when the empty line was received.
assert capsys.readouterr().out == (
"filtered line 1\n"
)
# fmt: on
def test_filter_func_output_and_stop_iteration(streamer, capsys):
"""A filter can indicate that logging should stop, and also output content."""
# Define a filter function that converts "output" into "filtered",
# and terminates streaming when a blank line is seen; but outputs
# one more line before terminating.
def filter_func(line):
if line == "":
yield "This should be the last line"
raise subprocess.StopStreaming()
yield line.replace("output", "filtered")
streamer.filter_func = filter_func
streamer.start()
streamer.join(timeout=5)
# fmt: off
# Output has been transformed, but is truncated when the empty line was received.
assert capsys.readouterr().out == (
"filtered line 1\n"
"This should be the last line\n"
)
# fmt: on
def test_filter_func_line_unexpected_error(streamer, capsys):
"""If a filter function fails, the error is caught and logged."""
# Define a filter function that raises a RunTimeError
# The newline is *not* included.
def filter_func(line):
if not line:
raise RuntimeError("Like something totally went wrong")
yield line
streamer.filter_func = filter_func
streamer.start()
streamer.join(timeout=5)
# fmt: off
# Exception
assert capsys.readouterr().out == (
"output line 1\n"
"Error while streaming output: RuntimeError: Like something totally went wrong\n"
)
# fmt: on
|