File: process_recorder.py

package info (click to toggle)
pytest-subprocess 1.5.3-1~bpo12%2B1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm-backports
  • size: 444 kB
  • sloc: python: 2,319; makefile: 17
file content (63 lines) | stat: -rw-r--r-- 1,967 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from typing import Iterator
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
from typing import Union

from .types import COMMAND
from .utils import Command

if TYPE_CHECKING:
    from .fake_popen import FakePopen, AsyncFakePopen


class ProcessRecorder:
    """
    Recorder class that holds all FakePopen (or AsyncFakePopen) instances that were
    created by the subprocess. The class contains auxiliary
    """

    calls: List[Union["FakePopen", "AsyncFakePopen"]]

    def __init__(self) -> None:
        self.calls = []

    @property
    def first_call(self) -> Optional[Union["FakePopen", "AsyncFakePopen"]]:
        """Get the first process call"""
        if not self.calls:
            return None
        return self.calls[0]

    @property
    def last_call(self) -> Optional[Union["FakePopen", "AsyncFakePopen"]]:
        """Get the last (latest) process call"""
        if not self.calls:
            return None
        return self.calls[-1]

    def call_count(self, command: Optional[COMMAND] = None) -> int:
        """Get process call count - optionally match with arguments"""
        if not command:
            return len(self.calls)
        return len(tuple(self.get_matching_calls(command)))

    def was_called(self, command: Optional[COMMAND] = None) -> bool:
        """Check if process was called - optionally match with arguments"""
        if not self.calls:
            return False
        if not command:
            return True
        return any(self.get_matching_calls(command))

    def get_matching_calls(
        self, command: COMMAND
    ) -> Iterator[Union["FakePopen", "AsyncFakePopen"]]:
        """Get calls that match arguments"""
        if not isinstance(command, Command):
            command = Command(command)
        return (call for call in self.calls if self.calls if command == call.args)

    def clear(self) -> None:
        """Clear records"""
        self.calls.clear()