File: test_observer.py

package info (click to toggle)
python-watchdog 6.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 808 kB
  • sloc: python: 6,384; ansic: 609; xml: 155; makefile: 124; sh: 8
file content (149 lines) | stat: -rw-r--r-- 4,004 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from __future__ import annotations

import contextlib
import threading
from typing import TYPE_CHECKING
from unittest.mock import patch

import pytest

from watchdog.events import FileModifiedEvent, FileSystemEventHandler
from watchdog.observers.api import BaseObserver, EventEmitter

if TYPE_CHECKING:
    from collections.abc import Iterator


@pytest.fixture
def observer() -> Iterator[BaseObserver]:
    obs = BaseObserver(EventEmitter)
    yield obs
    obs.stop()
    with contextlib.suppress(RuntimeError):
        obs.join()


@pytest.fixture
def observer2():
    obs = BaseObserver(EventEmitter)
    yield obs
    obs.stop()
    with contextlib.suppress(RuntimeError):
        obs.join()


def test_schedule_should_start_emitter_if_running(observer):
    observer.start()
    observer.schedule(None, "")
    (emitter,) = observer.emitters
    assert emitter.is_alive()


def test_schedule_should_not_start_emitter_if_not_running(observer):
    observer.schedule(None, "")
    (emitter,) = observer.emitters
    assert not emitter.is_alive()


def test_start_should_start_emitter(observer):
    observer.schedule(None, "")
    observer.start()
    (emitter,) = observer.emitters
    assert emitter.is_alive()


def test_stop_should_stop_emitter(observer):
    observer.schedule(None, "")
    observer.start()
    (emitter,) = observer.emitters
    assert emitter.is_alive()
    observer.stop()
    observer.join()
    assert not observer.is_alive()
    assert not emitter.is_alive()


def test_unschedule_self(observer):
    """
    Tests that unscheduling a watch from within an event handler correctly
    correctly unregisters emitter and handler without deadlocking.
    """

    class EventHandler(FileSystemEventHandler):
        def on_modified(self, event):
            observer.unschedule(watch)
            unschedule_finished.set()

    unschedule_finished = threading.Event()
    watch = observer.schedule(EventHandler(), "")
    observer.start()

    (emitter,) = observer.emitters
    emitter.queue_event(FileModifiedEvent(""))

    assert unschedule_finished.wait()
    assert len(observer.emitters) == 0


def test_schedule_after_unschedule_all(observer):
    observer.start()
    observer.schedule(None, "")
    assert len(observer.emitters) == 1

    observer.unschedule_all()
    assert len(observer.emitters) == 0

    observer.schedule(None, "")
    assert len(observer.emitters) == 1


def test_2_observers_on_the_same_path(observer, observer2):
    assert observer is not observer2

    observer.schedule(None, "")
    assert len(observer.emitters) == 1

    observer2.schedule(None, "")
    assert len(observer2.emitters) == 1


def test_start_failure_should_not_prevent_further_try(observer):
    observer.schedule(None, "")
    emitters = observer.emitters
    assert len(emitters) == 1

    # Make the emitter to fail on start()

    def mocked_start():
        raise OSError("Mock'ed!")

    emitter = next(iter(emitters))
    with patch.object(emitter, "start", new=mocked_start), pytest.raises(OSError, match="Mock'ed!"):
        observer.start()
    # The emitter should be removed from the list
    assert len(observer.emitters) == 0

    # Restoring the original behavior should work like there never be emitters
    observer.start()
    assert len(observer.emitters) == 0

    # Re-scheduling the watch should work
    observer.schedule(None, "")
    assert len(observer.emitters) == 1


def test_schedule_failure_should_not_prevent_future_schedules(observer):
    observer.start()

    # Make the emitter fail on start(), and subsequently the observer to fail on schedule()
    def bad_start(_):
        raise OSError("Mock'ed!")

    with patch.object(EventEmitter, "start", new=bad_start), pytest.raises(OSError, match="Mock'ed!"):
        observer.schedule(None, "")
    # The emitter should not be in the list
    assert not observer.emitters

    # Re-scheduling the watch should work
    observer.schedule(None, "")
    assert len(observer.emitters) == 1