1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
|
import shlex
from functools import wraps, partial, reduce
from io import TextIOWrapper
from itertools import chain, zip_longest
from os import PathLike
from subprocess import STDOUT, PIPE
from tempfile import TemporaryFile
from testfixtures.utils import extend_docstring
from typing import Union, Callable, List, Optional, Sequence, Tuple, Dict, Iterable
from .mock import Mock, call, _Call as Call
AnyStr = Union[str, bytes]
Command = Union[str, bytes, PathLike, Sequence[str], Sequence[bytes]]
def shell_join(command: Command) -> str:
if isinstance(command, str):
return command
elif isinstance(command, bytes):
return command.decode()
elif isinstance(command, PathLike):
return str(command)
elif isinstance(command, Iterable):
quoted_parts = []
for part in command:
if isinstance(part, str):
pass
elif isinstance(part, bytes):
part = part.decode()
elif isinstance(part, PathLike):
part = str(part)
elif not isinstance(part, (str, bytes)):
raise TypeError(f'{part!r} in {command} was {type(part)}, must be str')
quoted_parts.append(shlex.quote(part))
return " ".join(quoted_parts)
else:
raise TypeError(f'{command!r} was {type(command)}, must be str')
class PopenBehaviour:
"""
An object representing the behaviour of a :class:`MockPopen` when
simulating a particular command.
"""
def __init__(
self,
stdout: bytes = b'',
stderr: bytes = b'',
returncode: int = 0,
pid: int = 1234,
poll_count: int = 3
):
self.stdout = stdout
self.stderr = stderr
self.returncode = returncode
self.pid = pid
self.poll_count = poll_count
def record(func) -> Callable:
@wraps(func)
def recorder(self, *args, **kw):
self._record((func.__name__,), *args, **kw)
return func(self, *args, **kw)
return recorder
class MockPopenInstance:
"""
A mock process as returned by :class:`MockPopen`.
"""
#: A :class:`~unittest.mock.Mock` representing the pipe into this process.
#: This is only set if ``stdin=PIPE`` is passed the constructor.
#: The mock records writes and closes in :attr:`MockPopen.all_calls`.
stdin: Mock = None
#: A file representing standard output from this process.
stdout: TemporaryFile = None
#: A file representing error output from this process.
stderr: TemporaryFile = None
# These are not types as instantiation of this class is an internal implementation detail.
def __init__(self, mock_class, root_call,
args, bufsize=0, executable=None,
stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=False, shell=False, cwd=None,
env=None, universal_newlines=False,
startupinfo=None, creationflags=0, restore_signals=True,
start_new_session=False, pass_fds=(),
encoding=None, errors=None, text=None):
self.mock: Mock = Mock()
self.class_instance_mock: Mock = mock_class.mock.Popen_instance
#: A :func:`unittest.mock.call` representing the call made to instantiate
#: this mock process.
self.root_call: Call = root_call
#: The calls made on this mock process, represented using
#: :func:`~unittest.mock.call` instances.
self.calls: List[Call] = []
self.all_calls: List[Call] = mock_class.all_calls
cmd = shell_join(args)
behaviour = mock_class.commands.get(cmd, mock_class.default_behaviour)
if behaviour is None:
raise KeyError('Nothing specified for command %r' % cmd)
if callable(behaviour):
behaviour = behaviour(command=cmd, stdin=stdin)
self.behaviour: PopenBehaviour = behaviour
stdout_value = behaviour.stdout
stderr_value = behaviour.stderr
if stderr == STDOUT:
line_iterator = chain.from_iterable(zip_longest(
stdout_value.splitlines(True),
stderr_value.splitlines(True)
))
stdout_value = b''.join(l for l in line_iterator if l)
stderr_value = None
self.poll_count: int = behaviour.poll_count
for name, option, mock_value in (
('stdout', stdout, stdout_value),
('stderr', stderr, stderr_value)
):
value = None
if option is PIPE:
value = TemporaryFile()
value.write(mock_value)
value.flush()
value.seek(0)
if universal_newlines or text or encoding:
value = TextIOWrapper(value, encoding=encoding, errors=errors)
setattr(self, name, value)
if stdin == PIPE:
self.stdin = Mock()
for method in 'write', 'close':
record_writes = partial(self._record, ('stdin', method))
getattr(self.stdin, method).side_effect = record_writes
self.pid: int = behaviour.pid
#: The return code of this mock process.
self.returncode: Optional[int] = None
self.args: Command = args
def _record(self, names, *args, **kw):
for mock in self.class_instance_mock, self.mock:
reduce(getattr, names, mock)(*args, **kw)
for base_call, store in (
(call, self.calls),
(self.root_call, self.all_calls)
):
store.append(reduce(getattr, names, base_call)(*args, **kw))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.wait()
for stream in self.stdout, self.stderr:
if stream:
stream.close()
@record
def wait(self, timeout: float = None) -> int:
"Simulate calls to :meth:`subprocess.Popen.wait`"
self.returncode = self.behaviour.returncode
return self.returncode
@record
def communicate(self, input: AnyStr = None, timeout: float = None) -> Tuple[AnyStr, AnyStr]:
"Simulate calls to :meth:`subprocess.Popen.communicate`"
self.returncode = self.behaviour.returncode
return (self.stdout and self.stdout.read(),
self.stderr and self.stderr.read())
@record
def poll(self) -> Optional[int]:
"Simulate calls to :meth:`subprocess.Popen.poll`"
while self.poll_count and self.returncode is None:
self.poll_count -= 1
return None
# This call to wait() is NOT how poll() behaves in reality.
# poll() NEVER sets the returncode.
# The returncode is *only* ever set by process completion.
# The following is an artifact of the fixture's implementation.
self.returncode = self.behaviour.returncode
return self.returncode
@record
def send_signal(self, signal: int) -> None:
"Simulate calls to :meth:`subprocess.Popen.send_signal`"
pass
@record
def terminate(self) -> None:
"Simulate calls to :meth:`subprocess.Popen.terminate`"
pass
@record
def kill(self) -> None:
"Simulate calls to :meth:`subprocess.Popen.kill`"
pass
class MockPopen:
"""
A specialised mock for testing use of :class:`subprocess.Popen`.
An instance of this class can be used in place of the
:class:`subprocess.Popen` and is often inserted where it's needed using
:func:`unittest.mock.patch` or a :class:`~testfixtures.Replacer`.
"""
default_behaviour: PopenBehaviour = None
def __init__(self):
self.commands: Dict[str, PopenBehaviour] = {}
self.mock: Mock = Mock()
#: All calls made using this mock and the objects it returns, represented using
#: :func:`~unittest.mock.call` instances.
self.all_calls: List[Call] = []
def _resolve_behaviour(self, stdout, stderr, returncode,
pid, poll_count, behaviour):
if behaviour is None:
return PopenBehaviour(
stdout, stderr, returncode, pid, poll_count
)
else:
return behaviour
def set_command(
self,
command: str,
stdout: bytes = b'',
stderr: bytes = b'',
returncode: int = 0,
pid: int = 1234,
poll_count: int = 3,
behaviour: Union[PopenBehaviour, Callable] = None
):
"""
Set the behaviour of this mock when it is used to simulate the
specified command.
:param command: A :class:`str` representing the command to be simulated.
"""
self.commands[shell_join(command)] = self._resolve_behaviour(
stdout, stderr, returncode, pid, poll_count, behaviour
)
def set_default(self, stdout=b'', stderr=b'', returncode=0,
pid=1234, poll_count=3, behaviour=None):
"""
Set the behaviour of this mock when it is used to simulate commands
that have no explicit behavior specified using
:meth:`~MockPopen.set_command`.
"""
self.default_behaviour = self._resolve_behaviour(
stdout, stderr, returncode, pid, poll_count, behaviour
)
def __call__(self, *args, **kw):
self.mock.Popen(*args, **kw)
root_call = call.Popen(*args, **kw)
self.all_calls.append(root_call)
return MockPopenInstance(self, root_call, *args, **kw)
set_command_params = """
:param stdout:
:class:`bytes` representing the simulated content written by the process
to the stdout pipe.
:param stderr:
:class:`bytes` representing the simulated content written by the process
to the stderr pipe.
:param returncode:
An integer representing the return code of the simulated process.
:param pid:
An integer representing the process identifier of the simulated
process. This is useful if you have code the prints out the pids
of running processes.
:param poll_count:
Specifies the number of times :meth:`~MockPopenInstance.poll` can be
called before :attr:`~MockPopenInstance.returncode` is set and returned
by :meth:`~MockPopenInstance.poll`.
If supplied, ``behaviour`` must be either a :class:`PopenBehaviour`
instance or a callable that takes the ``command`` string representing
the command to be simulated and the ``stdin`` supplied when instantiating
the :class:`subprocess.Popen` with that command and should
return a :class:`PopenBehaviour` instance.
"""
# add the param docs, so we only have one copy of them!
extend_docstring(set_command_params,
[MockPopen.set_command, MockPopen.set_default])
|