File: test_Subprocess__stream_output.py

package info (click to toggle)
python-briefcase 0.3.22-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 7,300 kB
  • sloc: python: 59,405; makefile: 57
file content (156 lines) | stat: -rw-r--r-- 4,987 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import time
from threading import Event
from unittest import mock

import pytest

from briefcase.console import LogLevel
from briefcase.integrations import subprocess


@pytest.fixture()
def mock_sub(mock_sub):
    # also mock cleanup for stream output testing
    mock_sub.cleanup = mock.MagicMock()
    return mock_sub


def test_output(mock_sub, streaming_process, sleep_zero, capsys):
    """Process output is printed."""
    mock_sub.stream_output("testing", streaming_process)

    # fmt: off
    assert capsys.readouterr().out == (
        "output line 1\n"
        "\n"
        "output line 3\n"
    )
    # fmt: on
    mock_sub.cleanup.assert_called_once_with("testing", streaming_process)


def test_output_debug(mock_sub, streaming_process, sleep_zero, capsys):
    """Process output is printed; no debug output for only stream_output."""
    mock_sub.tools.console.verbosity = LogLevel.DEBUG

    mock_sub.stream_output("testing", streaming_process)

    # fmt: off
    assert capsys.readouterr().out == (
        "output line 1\n"
        "\n"
        "output line 3\n"
    )
    # fmt: on

    mock_sub.cleanup.assert_called_once_with("testing", streaming_process)


def test_keyboard_interrupt(mock_sub, streaming_process, capsys):
    """KeyboardInterrupt is suppressed if user sends CTRL+C and all output is
    printed."""

    send_ctrl_c = mock.MagicMock()
    send_ctrl_c.side_effect = [False, KeyboardInterrupt]

    with pytest.raises(KeyboardInterrupt):
        mock_sub.stream_output("testing", streaming_process, stop_func=send_ctrl_c)

    # fmt: off
    assert capsys.readouterr().out == (
        "output line 1\n"
        "\n"
        "output line 3\n"
        "Stopping...\n"
    )
    # fmt: on
    mock_sub.cleanup.assert_called_once_with("testing", streaming_process)


def test_process_exit_with_queued_output(
    mock_sub,
    streaming_process,
    sleep_zero,
    capsys,
):
    """All output is printed despite the process exiting early."""
    streaming_process.poll.side_effect = [None, -3, -3, -3]

    mock_sub.stream_output("testing", streaming_process)

    # fmt: off
    assert capsys.readouterr().out == (
        "output line 1\n"
        "\n"
        "output line 3\n"
    )
    # fmt: on
    mock_sub.cleanup.assert_called_once_with("testing", streaming_process)


@pytest.mark.parametrize("stop_func_ret_val", (True, False))
def test_stop_func(mock_sub, streaming_process, stop_func_ret_val, sleep_zero, capsys):
    """All output is printed whether stop_func aborts streaming or not."""
    mock_sub.stream_output(
        "testing", streaming_process, stop_func=lambda: stop_func_ret_val
    )

    # fmt: off
    assert capsys.readouterr().out == (
        "output line 1\n"
        "\n"
        "output line 3\n"
    )
    # fmt: on
    mock_sub.cleanup.assert_called_once_with("testing", streaming_process)


def test_stuck_streamer(mock_sub, streaming_process, sleep_zero, monkeypatch, capsys):
    """Following a KeyboardInterrupt, output streaming returns even if the output
    streamer becomes stuck."""

    # Mock time.time() to return times that monotonically increase by 1s
    # every time it is invoked. This allows us to simulate the progress of
    # time much faster than the actual calls to time.sleep() would.
    mock_time = mock.MagicMock(side_effect=range(1000, 1005))
    monkeypatch.setattr(time, "time", mock_time)

    # Flag that Briefcase has finished simulating its waiting on the output
    # streamer to exit normally; so, it should now exit.
    monkeypatched_streamer_should_exit = Event()
    # Flag that Briefcase waited too long on the output streamer
    monkeypatched_streamer_was_improperly_awaited = Event()
    # Flag that output streamer has exited
    monkeypatched_streamer_exited = Event()

    def monkeypatched_blocked_streamer(*a, **kw):
        """Simulate a streamer that blocks longer than it will be waited on."""
        if not monkeypatched_streamer_should_exit.wait(timeout=5):
            monkeypatched_streamer_was_improperly_awaited.set()
        monkeypatched_streamer_exited.set()

    monkeypatch.setattr(
        subprocess.PopenOutputStreamer,
        "run",
        monkeypatched_blocked_streamer,
    )

    send_ctrl_c = mock.MagicMock()
    send_ctrl_c.side_effect = [False, KeyboardInterrupt]
    with pytest.raises(KeyboardInterrupt):
        mock_sub.stream_output("testing", streaming_process, stop_func=send_ctrl_c)

    monkeypatched_streamer_should_exit.set()

    # Since the waiting around for the output streamer has been
    # short-circuited, Briefcase should quickly give up waiting on
    # the output streamer and it should exit...so, confirm it does.
    assert monkeypatched_streamer_exited.wait(timeout=1)
    assert not monkeypatched_streamer_was_improperly_awaited.is_set()

    # fmt: off
    assert capsys.readouterr().out == (
        "Stopping...\n"
        "Log stream hasn't terminated; log output may be corrupted.\n"
    )
    # fmt: on