File: utils.py

package info (click to toggle)
python-watchdog 6.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 808 kB
  • sloc: python: 6,384; ansic: 609; xml: 155; makefile: 124; sh: 8
file content (127 lines) | stat: -rw-r--r-- 3,912 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from __future__ import annotations

import dataclasses
import os
import subprocess
import sys
from queue import Queue
from typing import Protocol

from watchdog.events import FileSystemEvent
from watchdog.observers.api import EventEmitter, ObservedWatch
from watchdog.utils import platform

Emitter: type[EventEmitter]

if platform.is_linux():
    from watchdog.observers.inotify import InotifyEmitter as Emitter
    from watchdog.observers.inotify import InotifyFullEmitter
elif platform.is_darwin():
    from watchdog.observers.fsevents import FSEventsEmitter as Emitter
elif platform.is_windows():
    from watchdog.observers.read_directory_changes import WindowsApiEmitter as Emitter
elif platform.is_bsd():
    from watchdog.observers.kqueue import KqueueEmitter as Emitter


class P(Protocol):
    def __call__(self, *args: str) -> str: ...


class StartWatching(Protocol):
    def __call__(
        self,
        *,
        path: bytes | str | None = ...,
        use_full_emitter: bool = ...,
        recursive: bool = ...,
    ) -> EventEmitter: ...


class ExpectEvent(Protocol):
    def __call__(self, expected_event: FileSystemEvent, *, timeout: float = ...) -> None: ...


TestEventQueue = Queue[tuple[FileSystemEvent, ObservedWatch]]


@dataclasses.dataclass()
class Helper:
    tmp: str
    emitters: list[EventEmitter] = dataclasses.field(default_factory=list)
    event_queue: TestEventQueue = dataclasses.field(default_factory=Queue)

    def joinpath(self, *args: str) -> str:
        return os.path.join(self.tmp, *args)

    def start_watching(
        self,
        *,
        path: bytes | str | None = None,
        use_full_emitter: bool = False,
        recursive: bool = True,
    ) -> EventEmitter:
        # TODO: check if other platforms expect the trailing slash (e.g. `p('')`)
        path = self.tmp if path is None else path

        watcher = ObservedWatch(path, recursive=recursive)
        emitter_cls = InotifyFullEmitter if platform.is_linux() and use_full_emitter else Emitter
        emitter = emitter_cls(self.event_queue, watcher)

        if platform.is_darwin():
            # TODO: I think this could be better...  .suppress_history should maybe
            #       become a common attribute.
            from watchdog.observers.fsevents import FSEventsEmitter

            assert isinstance(emitter, FSEventsEmitter)
            emitter.suppress_history = True

        self.emitters.append(emitter)
        emitter.start()

        return emitter

    def expect_event(self, expected_event: FileSystemEvent, timeout: float = 2) -> None:
        """Utility function to wait up to `timeout` seconds for an `event_type` for `path` to show up in the queue.

        Provides some robustness for the otherwise flaky nature of asynchronous notifications.
        """
        assert self.event_queue.get(timeout=timeout)[0] == expected_event

    def close(self) -> None:
        for emitter in self.emitters:
            emitter.stop()

        for emitter in self.emitters:
            if emitter.is_alive():
                emitter.join(5)

        alive = [emitter.is_alive() for emitter in self.emitters]
        self.emitters = []
        assert alive == [False] * len(alive)


def run_isolated_test(path):
    isolated_test_prefix = os.path.join("tests", "isolated")
    path = os.path.abspath(os.path.join(isolated_test_prefix, path))

    src_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "src")
    new_env = os.environ.copy()
    new_env["PYTHONPATH"] = os.pathsep.join([*sys.path, src_dir])

    new_argv = [sys.executable, path]

    p = subprocess.Popen(
        new_argv,
        env=new_env,
    )

    # in case test goes haywire, don't let it run forever
    timeout = 10
    try:
        p.communicate(timeout=timeout)
    except subprocess.TimeoutExpired:
        p.kill()
        raise

    assert p.returncode == 0