1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
from __future__ import annotations
import dataclasses
import os
import subprocess
import sys
from queue import Queue
from typing import Protocol
from watchdog.events import FileSystemEvent
from watchdog.observers.api import EventEmitter, ObservedWatch
from watchdog.utils import platform
Emitter: type[EventEmitter]
if platform.is_linux():
from watchdog.observers.inotify import InotifyEmitter as Emitter
from watchdog.observers.inotify import InotifyFullEmitter
elif platform.is_darwin():
from watchdog.observers.fsevents import FSEventsEmitter as Emitter
elif platform.is_windows():
from watchdog.observers.read_directory_changes import WindowsApiEmitter as Emitter
elif platform.is_bsd():
from watchdog.observers.kqueue import KqueueEmitter as Emitter
class P(Protocol):
def __call__(self, *args: str) -> str: ...
class StartWatching(Protocol):
def __call__(
self,
*,
path: bytes | str | None = ...,
use_full_emitter: bool = ...,
recursive: bool = ...,
) -> EventEmitter: ...
class ExpectEvent(Protocol):
def __call__(self, expected_event: FileSystemEvent, *, timeout: float = ...) -> None: ...
TestEventQueue = Queue[tuple[FileSystemEvent, ObservedWatch]]
@dataclasses.dataclass()
class Helper:
tmp: str
emitters: list[EventEmitter] = dataclasses.field(default_factory=list)
event_queue: TestEventQueue = dataclasses.field(default_factory=Queue)
def joinpath(self, *args: str) -> str:
return os.path.join(self.tmp, *args)
def start_watching(
self,
*,
path: bytes | str | None = None,
use_full_emitter: bool = False,
recursive: bool = True,
) -> EventEmitter:
# TODO: check if other platforms expect the trailing slash (e.g. `p('')`)
path = self.tmp if path is None else path
watcher = ObservedWatch(path, recursive=recursive)
emitter_cls = InotifyFullEmitter if platform.is_linux() and use_full_emitter else Emitter
emitter = emitter_cls(self.event_queue, watcher)
if platform.is_darwin():
# TODO: I think this could be better... .suppress_history should maybe
# become a common attribute.
from watchdog.observers.fsevents import FSEventsEmitter
assert isinstance(emitter, FSEventsEmitter)
emitter.suppress_history = True
self.emitters.append(emitter)
emitter.start()
return emitter
def expect_event(self, expected_event: FileSystemEvent, timeout: float = 2) -> None:
"""Utility function to wait up to `timeout` seconds for an `event_type` for `path` to show up in the queue.
Provides some robustness for the otherwise flaky nature of asynchronous notifications.
"""
assert self.event_queue.get(timeout=timeout)[0] == expected_event
def close(self) -> None:
for emitter in self.emitters:
emitter.stop()
for emitter in self.emitters:
if emitter.is_alive():
emitter.join(5)
alive = [emitter.is_alive() for emitter in self.emitters]
self.emitters = []
assert alive == [False] * len(alive)
def run_isolated_test(path):
isolated_test_prefix = os.path.join("tests", "isolated")
path = os.path.abspath(os.path.join(isolated_test_prefix, path))
src_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "src")
new_env = os.environ.copy()
new_env["PYTHONPATH"] = os.pathsep.join([*sys.path, src_dir])
new_argv = [sys.executable, path]
p = subprocess.Popen(
new_argv,
env=new_env,
)
# in case test goes haywire, don't let it run forever
timeout = 10
try:
p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
p.kill()
raise
assert p.returncode == 0
|