1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
|
"""ProcessDispatcher class declaration"""
import asyncio
import subprocess
import sys
import typing
from collections import deque
from copy import deepcopy
from functools import partial
from typing import Any as AnyType
from typing import AnyStr
from typing import Awaitable
from typing import Callable
from typing import Deque
from typing import Dict
from typing import Generic
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import Union
from . import asyncio_subprocess
from . import exceptions
from .fake_popen import AsyncFakePopen
from .fake_popen import FakePopen
from .types import COMMAND
from .utils import Command
if typing.TYPE_CHECKING:
from .fake_process import FakeProcess
__all__ = ["ProcessDispatcher"]
class ProcessDispatcher:
"""Main class for handling processes."""
process_list: List["FakeProcess"] = []
built_in_popen: Optional[Callable] = None
built_in_async_subprocess: Optional[AnyType] = None
_allow_unregistered: bool = False
_cache: Dict["FakeProcess", Dict["FakeProcess", AnyType]] = dict()
_keep_last_process: bool = False
_pid: int = 0
@classmethod
def register(cls, process: "FakeProcess") -> None:
if not cls.process_list:
cls.built_in_popen = subprocess.Popen
subprocess.Popen = FakePopenWrapper # type: ignore
cls.built_in_async_subprocess = asyncio.subprocess
asyncio.create_subprocess_shell = cls.async_shell # type: ignore
asyncio.create_subprocess_exec = cls.async_exec # type: ignore
asyncio.subprocess = asyncio_subprocess
cls._cache[process] = {
proc: deepcopy(proc.definitions) for proc in cls.process_list
}
cls.process_list.append(process)
@classmethod
def deregister(cls, process: "FakeProcess") -> None:
cls.process_list.remove(process)
cache = cls._cache.pop(process)
for proc, processes in cache.items():
proc.definitions = processes
if not cls.process_list:
subprocess.Popen = cls.built_in_popen # type: ignore
cls.built_in_popen = None
if cls.built_in_async_subprocess is None:
raise exceptions.PluginInternalError
asyncio.subprocess = cls.built_in_async_subprocess
asyncio.create_subprocess_shell = (
cls.built_in_async_subprocess.create_subprocess_shell
)
asyncio.create_subprocess_exec = (
cls.built_in_async_subprocess.create_subprocess_exec
)
cls.built_in_async_subprocess = None
@classmethod
def dispatch(
cls, command: COMMAND, **kwargs: Optional[Dict]
) -> Union[FakePopen, subprocess.Popen]:
"""This method will be used instead of the subprocess.Popen()"""
process = cls.__dispatch(command)
if process is None:
if cls.built_in_popen is None:
raise exceptions.PluginInternalError
popen: subprocess.Popen = cls.built_in_popen(command, **kwargs)
return popen
result = cls._prepare_instance(FakePopen, command, kwargs, process)
if not isinstance(result, FakePopen):
raise exceptions.PluginInternalError
result.run_thread()
return result
@classmethod
async def async_shell(
cls, cmd: Union[str, bytes], **kwargs: Dict
) -> Union[AsyncFakePopen, asyncio.subprocess.Process]:
"""Replacement of asyncio.create_subprocess_shell()"""
if not isinstance(cmd, (str, bytes)):
raise ValueError("cmd must be a string")
method = partial(
cls.built_in_async_subprocess.create_subprocess_shell, # type: ignore
cmd,
**kwargs
)
if isinstance(cmd, bytes):
cmd = cmd.decode()
return await cls._call_async(cmd, method, kwargs)
@classmethod
async def async_exec(
cls, program: Union[str, bytes], *args: Union[str, bytes], **kwargs: Dict
) -> Union[AsyncFakePopen, asyncio.subprocess.Process]:
"""Replacement of asyncio.create_subprocess_exec()"""
if not isinstance(program, (str, bytes)):
raise ValueError("program must be a string")
method = partial(
cls.built_in_async_subprocess.create_subprocess_exec, # type: ignore
program,
*args,
**kwargs
)
if isinstance(program, bytes):
program = program.decode()
command = [
program,
*[arg.decode() if isinstance(arg, bytes) else arg for arg in args],
]
return await cls._call_async(command, method, kwargs)
@classmethod
async def _call_async(
cls,
command: COMMAND,
async_method: Callable,
kwargs: Dict,
) -> Union[AsyncFakePopen, asyncio.subprocess.Process]:
process = cls.__dispatch(command)
if process is None:
if cls.built_in_async_subprocess is None:
raise exceptions.PluginInternalError
async_shell: Awaitable[asyncio.subprocess.Process] = async_method()
return await async_shell
if sys.platform == "win32" and isinstance(
asyncio.get_event_loop_policy().get_event_loop(), asyncio.SelectorEventLoop
):
raise NotImplementedError(
"The SelectorEventLoop doesn't support subprocess"
)
result = cls._prepare_instance(AsyncFakePopen, command, kwargs, process)
if not isinstance(result, AsyncFakePopen):
raise exceptions.PluginInternalError
result.evaluate()
return result
@classmethod
def _prepare_instance(
cls,
klass: Union[Type[FakePopen], Type[AsyncFakePopen]],
command: COMMAND,
kwargs: dict,
process: dict,
) -> Union[FakePopen, AsyncFakePopen]:
# Update the command with the actual command specified by the caller.
# This will ensure that Command objects do not end up unexpectedly in
# caller's objects (e.g. proc.args, CalledProcessError.cmd). Take care
# to preserve the dict that may still be referenced when using
# keep_last_process.
fake_popen_kwargs = process.copy()
fake_popen_kwargs["command"] = command
recorder = fake_popen_kwargs.pop("recorder")
result = klass(**fake_popen_kwargs)
recorder.calls.append(result)
result.pid = cls._pid
result.configure(**kwargs)
return result
@classmethod
def __dispatch(cls, command: COMMAND) -> Optional[dict]:
command_instance, processes, process_instance = cls._get_process(command)
if process_instance:
process_instance.calls.append(command)
if not processes:
if not cls.process_list[-1]._allow_unregistered:
raise exceptions.ProcessNotRegisteredError(
"The process '%s' was not registered."
% (
(
command
if isinstance(command, str)
else " ".join(str(item) for item in command)
),
)
)
else:
return None
process = processes.popleft()
if not processes and process_instance is not None:
if cls.process_list[-1]._keep_last_process:
processes.append(process)
elif command_instance:
del process_instance.definitions[command_instance]
cls._pid += 1
if isinstance(process, bool):
# real process will be called
return None
return process
@classmethod
def _get_process(
cls, command: COMMAND
) -> Tuple[
Optional[Command], Optional[Deque[Union[dict, bool]]], Optional["FakeProcess"]
]:
for proc in reversed(cls.process_list):
command_instance, processes = next(
(
(key, value)
for key, value in proc.definitions.items()
if key == command
),
(None, None),
)
process_instance = proc
if processes and isinstance(processes, deque):
return command_instance, processes, process_instance
return None, None, None
class FakePopenWrapper(Generic[AnyStr]):
def __new__( # type: ignore
cls, command: COMMAND, **kwargs: Optional[Dict]
) -> FakePopen:
return ProcessDispatcher.dispatch(command, **kwargs) # type: ignore
|