File: _util.py

package info (click to toggle)
python-invoke 1.4.1%2Bds-0.1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,704 kB
  • sloc: python: 11,377; makefile: 18; sh: 12
file content (331 lines) | stat: -rw-r--r-- 9,846 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
import os
import sys

try:
    import termios
except ImportError:
    # Not available on Windows
    termios = None
from contextlib import contextmanager

from six import BytesIO, b, wraps

from mock import patch, Mock
from pytest import skip
from pytest_relaxed import trap

from invoke import Program, Runner
from invoke.terminals import WINDOWS


support = os.path.join(os.path.dirname(__file__), "_support")
ROOT = os.path.abspath(os.path.sep)


def skip_if_windows(fn):
    @wraps(fn)
    def wrapper(*args, **kwargs):
        if WINDOWS:
            skip()
        return fn(*args, **kwargs)

    return wrapper


@contextmanager
def support_path():
    sys.path.insert(0, support)
    try:
        yield
    finally:
        sys.path.pop(0)


def load(name):
    with support_path():
        imported = __import__(name)
        return imported


def support_file(subpath):
    with open(os.path.join(support, subpath)) as fd:
        return fd.read()


@trap
def run(invocation, program=None, invoke=True):
    """
    Run ``invocation`` via ``program``, returning output stream captures.

    ``program`` defaults to ``Program()``.

    To skip automatically assuming the argv under test starts with ``"invoke
    "``, say ``invoke=False``.

    :returns: Two-tuple of ``stdout, stderr`` strings.
    """
    if program is None:
        program = Program()
    if invoke:
        invocation = "invoke {}".format(invocation)
    program.run(invocation, exit=False)
    return sys.stdout.getvalue(), sys.stderr.getvalue()


def expect(
    invocation, out=None, err=None, program=None, invoke=True, test=None
):
    """
    Run ``invocation`` via ``program`` and expect resulting output to match.

    May give one or both of ``out``/``err`` (but not neither).

    ``program`` defaults to ``Program()``.

    To skip automatically assuming the argv under test starts with ``"invoke
    "``, say ``invoke=False``.

    To customize the operator used for testing (default: equality), use
    ``test`` (which should be an assertion wrapper of some kind).
    """
    stdout, stderr = run(invocation, program, invoke)
    # Perform tests
    if out is not None:
        if test:
            test(stdout, out)
        else:
            assert out == stdout
    if err is not None:
        if test:
            test(stderr, err)
        else:
            assert err == stderr
    # Guard against silent failures; since we say exit=False this is the only
    # real way to tell if stuff died in a manner we didn't expect.
    elif stderr:
        assert False, "Unexpected stderr: {}".format(stderr)
    return stdout, stderr


class MockSubprocess(object):
    def __init__(self, out="", err="", exit=0, isatty=None, autostart=True):
        self.out_file = BytesIO(b(out))
        self.err_file = BytesIO(b(err))
        self.exit = exit
        self.isatty = isatty
        if autostart:
            self.start()

    def start(self):
        # Start patchin'
        self.popen = patch("invoke.runners.Popen")
        Popen = self.popen.start()
        self.read = patch("os.read")
        read = self.read.start()
        self.sys_stdin = patch("sys.stdin", new_callable=BytesIO)
        sys_stdin = self.sys_stdin.start()
        # Setup mocks
        process = Popen.return_value
        process.returncode = self.exit
        process.stdout.fileno.return_value = 1
        process.stderr.fileno.return_value = 2
        # If requested, mock isatty to fake out pty detection
        if self.isatty is not None:
            sys_stdin.isatty = Mock(return_value=self.isatty)

        def fakeread(fileno, count):
            fd = {1: self.out_file, 2: self.err_file}[fileno]
            return fd.read(count)

        read.side_effect = fakeread
        # Return the Popen mock as it's sometimes wanted inside tests
        return Popen

    def stop(self):
        self.popen.stop()
        self.read.stop()
        self.sys_stdin.stop()


def mock_subprocess(out="", err="", exit=0, isatty=None, insert_Popen=False):
    def decorator(f):
        @wraps(f)
        # We have to include a @patch here to trick pytest into ignoring
        # the wrapped test's sometimes-there, sometimes-not mock_Popen arg. (It
        # explicitly "skips ahead" past what it perceives as patch args, even
        # though in our case those are not applying to the test function!)
        # Doesn't matter what we patch as long as it doesn't
        # actually get in our way.
        @patch("invoke.runners.pty")
        def wrapper(*args, **kwargs):
            proc = MockSubprocess(
                out=out, err=err, exit=exit, isatty=isatty, autostart=False
            )
            Popen = proc.start()
            args = list(args)
            args.pop()  # Pop the dummy patch
            if insert_Popen:
                args.append(Popen)
            try:
                f(*args, **kwargs)
            finally:
                proc.stop()

        return wrapper

    return decorator


def mock_pty(
    out="",
    err="",
    exit=0,
    isatty=None,
    trailing_error=None,
    skip_asserts=False,
    insert_os=False,
    be_childish=False,
    os_close_error=False,
):
    # Windows doesn't have ptys, so all the pty tests should be
    # skipped anyway...
    if WINDOWS:
        return skip_if_windows

    def decorator(f):
        import fcntl

        ioctl_patch = patch("invoke.runners.fcntl.ioctl", wraps=fcntl.ioctl)

        @wraps(f)
        @patch("invoke.runners.pty")
        @patch("invoke.runners.os")
        @ioctl_patch
        def wrapper(*args, **kwargs):
            args = list(args)
            pty, os, ioctl = args.pop(), args.pop(), args.pop()
            # Don't actually fork, but pretend we did (with "our" pid differing
            # depending on be_childish) & give 'parent fd' of 3 (typically,
            # first allocated non-stdin/out/err FD)
            pty.fork.return_value = (12345 if be_childish else 0), 3
            # We don't really need to care about waiting since not truly
            # forking/etc, so here we just return a nonzero "pid" + sentinel
            # wait-status value (used in some tests about WIFEXITED etc)
            os.waitpid.return_value = None, Mock(name="exitstatus")
            # Either or both of these may get called, depending...
            os.WEXITSTATUS.return_value = exit
            os.WTERMSIG.return_value = exit
            # If requested, mock isatty to fake out pty detection
            if isatty is not None:
                os.isatty.return_value = isatty
            out_file = BytesIO(b(out))
            err_file = BytesIO(b(err))

            def fakeread(fileno, count):
                fd = {3: out_file, 2: err_file}[fileno]
                ret = fd.read(count)
                # If asked, fake a Linux-platform trailing I/O error.
                if not ret and trailing_error:
                    raise trailing_error
                return ret

            os.read.side_effect = fakeread
            if os_close_error:
                os.close.side_effect = IOError
            if insert_os:
                args.append(os)

            # Do the thing!!!
            f(*args, **kwargs)

            # Short-circuit if we raised an error in fakeread()
            if trailing_error:
                return
            # Sanity checks to make sure the stuff we mocked, actually got ran!
            pty.fork.assert_called_with()
            # Skip rest of asserts if we pretended to be the child
            if be_childish:
                return
            # Expect a get, and then later set, of terminal window size
            assert ioctl.call_args_list[0][0][1] == termios.TIOCGWINSZ
            assert ioctl.call_args_list[1][0][1] == termios.TIOCSWINSZ
            if not skip_asserts:
                for name in ("execve", "waitpid"):
                    assert getattr(os, name).called
                # Ensure at least one of the exit status getters was called
                assert os.WEXITSTATUS.called or os.WTERMSIG.called
                # Ensure something closed the pty FD
                os.close.assert_called_once_with(3)

        return wrapper

    return decorator


class _Dummy(Runner):
    """
    Dummy runner subclass that does minimum work required to execute run().

    It also serves as a convenient basic API checker; failure to update it to
    match the current Runner API will cause TypeErrors, NotImplementedErrors,
    and similar.
    """

    # Neuter the input loop sleep, so tests aren't slow (at the expense of CPU,
    # which isn't a problem for testing).
    input_sleep = 0

    def start(self, command, shell, env, timeout=None):
        pass

    def read_proc_stdout(self, num_bytes):
        return ""

    def read_proc_stderr(self, num_bytes):
        return ""

    def _write_proc_stdin(self, data):
        pass

    def close_proc_stdin(self):
        pass

    @property
    def process_is_finished(self):
        return True

    def returncode(self):
        return 0

    def stop(self):
        pass

    @property
    def timed_out(self):
        return False


# Dummy command that will blow up if it ever truly hits a real shell.
_ = "nope"


# Runner that fakes ^C during subprocess exec
class _KeyboardInterruptingRunner(_Dummy):
    def __init__(self, *args, **kwargs):
        super(_KeyboardInterruptingRunner, self).__init__(*args, **kwargs)
        self._interrupted = False

    # Trigger KeyboardInterrupt during wait()
    def wait(self):
        if not self._interrupted:
            self._interrupted = True
            raise KeyboardInterrupt

    # But also, after that has been done, pretend subprocess shutdown happened
    # (or we will loop forever).
    def process_is_finished(self):
        return self._interrupted


class OhNoz(Exception):
    pass