File: test_PopenOutputStreamer.py

package info (click to toggle)
python-briefcase 0.3.22-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 7,300 kB
  • sloc: python: 59,405; makefile: 57
file content (333 lines) | stat: -rw-r--r-- 8,850 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import threading
from io import StringIO

import pytest

from briefcase.console import Console
from briefcase.integrations import subprocess
from briefcase.integrations.subprocess import PopenOutputStreamer


@pytest.fixture
def streamer(streaming_process):
    return PopenOutputStreamer(
        label="test",
        popen_process=streaming_process,
        console=Console(),
    )


def test_output(streamer, capsys):
    """Process output is printed."""
    streamer.start()
    streamer.join(timeout=5)

    # fmt: off
    assert capsys.readouterr().out == (
        "output line 1\n"
        "\n"
        "output line 3\n"
    )
    # fmt: on


def test_stdout_closes_unexpectedly(streamer, monkeypatch, capsys):
    """Streamer exits from ValueError because stdout was closed."""

    def monkeypatch_ensure_str(value):
        """Close stdout when ensure_str() runs on output from readline()."""
        streamer.popen_process.stdout.close()
        return value

    monkeypatch.setattr(subprocess, "ensure_str", monkeypatch_ensure_str)
    streamer.popen_process.stdout = StringIO("output line 1\noutput line 2")

    streamer.start()
    streamer.join(timeout=5)

    assert capsys.readouterr().out == (
        "output line 1\n"
        "WARNING: stdout was unexpectedly closed while streaming output\n"
    )


def test_readline_raises_exception(streamer, monkeypatch, capsys):
    """Streamer aborts if readline() raises ValueError for reasons other than stdout
    closing."""

    def monkeypatch_ensure_str(value):
        """Simulate readline() raising an ValueError-derived exception."""
        raise UnicodeError("readline() exception")

    monkeypatch.setattr(subprocess, "ensure_str", monkeypatch_ensure_str)

    streamer.start()
    streamer.join(timeout=5)

    assert capsys.readouterr().out == (
        "Error while streaming output: UnicodeError: readline() exception\n"
    )


def test_request_stop(streamer, capsys):
    """Requesting the streamer to stop sets the stop flag."""
    streamer.start()
    streamer.join(timeout=5)

    # fmt: off
    assert capsys.readouterr().out == (
        "output line 1\n"
        "\n"
        "output line 3\n"
    )
    # fmt: on

    assert not streamer.stop_flag.is_set()

    streamer.request_stop()

    assert streamer.stop_flag.is_set()


def test_request_stop_set_immediately(streamer, capsys):
    """Nothing is printed if stop flag is immediately set."""
    streamer.request_stop()

    streamer.start()
    streamer.join(timeout=5)

    assert capsys.readouterr().out == ""


def test_request_stop_set_during_output(streamer, monkeypatch, capsys):
    """Streamer prints nothing more after stop flag is set."""

    def filter_func(value):
        """Simulate stop flag set while output is being read."""
        streamer.request_stop()
        yield value

    streamer.filter_func = filter_func

    streamer.start()
    streamer.join(timeout=5)

    assert capsys.readouterr().out == "output line 1\n"


def test_captured_output(streamer, capsys):
    """Captured output is available after streaming."""
    streamer.capture_output = True

    streamer.start()
    streamer.join(timeout=5)

    assert capsys.readouterr().out == ""

    # fmt: off
    assert streamer.captured_output == (
        "output line 1\n"
        "\n"
        "output line 3\n"
    )
    # fmt: on


def test_captured_output_interim(streamer, monkeypatch, capsys):
    """Captured output is available during streaming."""
    streamer.capture_output = True

    streamer_is_waiting = threading.Event()
    continue_streaming = threading.Event()

    def mock_readline():
        """Wait on asserts while returning output lines."""
        yield "output line 1\n"
        yield "output line 2\n"
        streamer_is_waiting.set()
        continue_streaming.wait(timeout=5)
        yield "output line 3\n"
        yield "output line 4\n"
        yield ""

    streamer.popen_process.stdout.readline.side_effect = mock_readline()

    streamer.start()
    streamer_is_waiting.wait(timeout=5)

    assert streamer.captured_output == "output line 1\noutput line 2\n"

    continue_streaming.set()
    streamer.join(timeout=5)

    assert streamer.captured_output == "output line 3\noutput line 4\n"


def test_filter_func(streamer, capsys):
    """A filter can be added to modify an output stream."""

    # Define a filter function that converts "output" into "filtered"
    def filter_func(line):
        yield line.replace("output", "filtered")

    streamer.filter_func = filter_func

    streamer.start()
    streamer.join(timeout=5)

    # fmt: off
    # Output has been transformed
    assert capsys.readouterr().out == (
        "filtered line 1\n"
        "\n"
        "filtered line 3\n"
    )
    # fmt: on


def test_filter_func_reject(streamer, capsys):
    """A filter that rejects lines can be added to modify an output stream."""

    # Define a filter function that ignores blank lines
    def filter_func(line):
        if len(line) == 0:
            return
        yield line

    streamer.filter_func = filter_func

    streamer.start()
    streamer.join(timeout=5)

    # fmt: off
    # Output has been transformed
    assert capsys.readouterr().out == (
        "output line 1\n"
        "output line 3\n"
    )
    # fmt: on


def test_filter_func_line_ends(streamer, sleep_zero, capsys):
    """Filter functions are not provided the newline."""

    # Define a filter function that redacts lines that end with 1
    # The newline is *not* included.
    def filter_func(line):
        if line.endswith("line 1"):
            yield line.replace("line 1", "**REDACTED**")
        else:
            yield line

    streamer.filter_func = filter_func

    streamer.start()
    streamer.join(timeout=5)

    # fmt: off
    # Output has been transformed; newline exists in output
    assert capsys.readouterr().out == (
        "output **REDACTED**\n"
        "\n"
        "output line 3\n"
    )
    # fmt: on


def test_filter_func_line_multiple_output(streamer, capsys):
    """Filter functions can generate multiple lines from a single input."""

    # Define a filter function that adds an extra line of content when the
    # lines that end with 1
    def filter_func(line):
        yield line
        if line.endswith("line 1"):
            yield "Extra content!"

    streamer.filter_func = filter_func

    streamer.start()
    streamer.join(timeout=5)

    # fmt: off
    # Output has been transformed; newline exists in output
    assert capsys.readouterr().out == (
        "output line 1\n"
        "Extra content!\n"
        "\n"
        "output line 3\n"
    )
    # fmt: on


def test_filter_func_stop_iteration(streamer, capsys):
    """A filter can indicate that logging should stop."""

    # Define a filter function that converts "output" into "filtered",
    # and terminates streaming when a blank line is seen.
    def filter_func(line):
        if line == "":
            raise subprocess.StopStreaming()
        yield line.replace("output", "filtered")

    streamer.filter_func = filter_func

    streamer.start()
    streamer.join(timeout=5)

    # fmt: off
    # Output has been transformed, but is truncated when the empty line was received.
    assert capsys.readouterr().out == (
        "filtered line 1\n"
    )
    # fmt: on


def test_filter_func_output_and_stop_iteration(streamer, capsys):
    """A filter can indicate that logging should stop, and also output content."""

    # Define a filter function that converts "output" into "filtered",
    # and terminates streaming when a blank line is seen; but outputs
    # one more line before terminating.
    def filter_func(line):
        if line == "":
            yield "This should be the last line"
            raise subprocess.StopStreaming()
        yield line.replace("output", "filtered")

    streamer.filter_func = filter_func

    streamer.start()
    streamer.join(timeout=5)

    # fmt: off
    # Output has been transformed, but is truncated when the empty line was received.
    assert capsys.readouterr().out == (
        "filtered line 1\n"
        "This should be the last line\n"
    )
    # fmt: on


def test_filter_func_line_unexpected_error(streamer, capsys):
    """If a filter function fails, the error is caught and logged."""

    # Define a filter function that raises a RunTimeError
    # The newline is *not* included.
    def filter_func(line):
        if not line:
            raise RuntimeError("Like something totally went wrong")
        yield line

    streamer.filter_func = filter_func

    streamer.start()
    streamer.join(timeout=5)

    # fmt: off
    # Exception
    assert capsys.readouterr().out == (
        "output line 1\n"
        "Error while streaming output: RuntimeError: Like something totally went wrong\n"
    )
    # fmt: on