File: test_inotify_c.py

package info (click to toggle)
python-watchdog 6.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 808 kB
  • sloc: python: 6,384; ansic: 609; xml: 155; makefile: 124; sh: 8
file content (211 lines) | stat: -rw-r--r-- 6,847 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
from __future__ import annotations

from contextlib import ExitStack

import pytest

from watchdog.utils import platform

if not platform.is_linux():
    pytest.skip("GNU/Linux only.", allow_module_level=True)

import ctypes
import errno
import logging
import os
import select
import struct
from typing import TYPE_CHECKING
from unittest.mock import patch

from watchdog.events import DirCreatedEvent, DirDeletedEvent, DirModifiedEvent
from watchdog.observers.inotify_c import Inotify, InotifyConstants, InotifyEvent

if TYPE_CHECKING:
    from .utils import Helper, P, StartWatching, TestEventQueue

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


def struct_inotify(wd, mask, cookie=0, length=0, name=b""):
    assert len(name) <= length
    struct_format = (
        "="  # (native endianness, standard sizes)
        "i"  # int      wd
        "i"  # uint32_t mask
        "i"  # uint32_t cookie
        "i"  # uint32_t len
        f"{length}s"  # char[] name
    )
    return struct.pack(struct_format, wd, mask, cookie, length, name)


def test_late_double_deletion(helper: Helper, p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
    inotify_fd = type("FD", (object,), {})()
    inotify_fd.last = 0
    inotify_fd.wds = []

    const = InotifyConstants()

    # CREATE DELETE CREATE DELETE DELETE_SELF IGNORE DELETE_SELF IGNORE
    inotify_fd.buf = (
        struct_inotify(wd=1, mask=const.IN_CREATE | const.IN_ISDIR, length=16, name=b"subdir1")
        + struct_inotify(wd=1, mask=const.IN_DELETE | const.IN_ISDIR, length=16, name=b"subdir1")
    ) * 2 + (
        struct_inotify(wd=2, mask=const.IN_DELETE_SELF)
        + struct_inotify(wd=2, mask=const.IN_IGNORED)
        + struct_inotify(wd=3, mask=const.IN_DELETE_SELF)
        + struct_inotify(wd=3, mask=const.IN_IGNORED)
    )

    select_bkp = select.select

    def fakeselect(read_list, *args, **kwargs):
        if inotify_fd in read_list:
            return [inotify_fd], [], []
        return select_bkp(read_list, *args, **kwargs)

    poll_bkp = select.poll

    class Fakepoll:
        def __init__(self):
            self._orig = poll_bkp()
            self._fake = False

        def register(self, fd, *args, **kwargs):
            if fd == inotify_fd:
                self._fake = True
                return None
            return self._orig.register(fd, *args, **kwargs)

        def poll(self, *args, **kwargs):
            if self._fake:
                return [(inotify_fd, select.POLLIN)]
            return self._orig.poll(*args, **kwargs)

    os_read_bkp = os.read

    def fakeread(fd, length):
        if fd is inotify_fd:
            result, fd.buf = fd.buf[:length], fd.buf[length:]
            return result
        return os_read_bkp(fd, length)

    os_close_bkp = os.close

    def fakeclose(fd):
        if fd is not inotify_fd:
            os_close_bkp(fd)

    def inotify_init():
        return inotify_fd

    def inotify_add_watch(fd, path, mask):
        fd.last += 1
        logger.debug("New wd = %d", fd.last)
        fd.wds.append(fd.last)
        return fd.last

    def inotify_rm_watch(fd, wd):
        logger.debug("Removing wd = %d", wd)
        fd.wds.remove(wd)
        return 0

    # Mocks the API!
    from watchdog.observers import inotify_c

    mock1 = patch.object(os, "read", new=fakeread)
    mock2 = patch.object(os, "close", new=fakeclose)
    mock3 = patch.object(inotify_c, "inotify_init", new=inotify_init)
    mock4 = patch.object(inotify_c, "inotify_add_watch", new=inotify_add_watch)
    mock5 = patch.object(inotify_c, "inotify_rm_watch", new=inotify_rm_watch)
    mock6 = patch.object(select, "select", new=fakeselect)
    mock7 = patch.object(select, "poll", new=Fakepoll)

    with mock1, mock2, mock3, mock4, mock5, mock6, mock7:
        start_watching(path=p(""))
        # Watchdog Events
        for evt_cls in [DirCreatedEvent, DirDeletedEvent] * 2:
            event = event_queue.get(timeout=5)[0]
            assert isinstance(event, evt_cls)
            assert event.src_path == p("subdir1")
            event = event_queue.get(timeout=5)[0]
            assert isinstance(event, DirModifiedEvent)
            assert event.src_path == p("").rstrip(os.path.sep)
        helper.close()

    assert inotify_fd.last == 3  # Number of directories
    assert inotify_fd.buf == b""  # Didn't miss any event
    assert inotify_fd.wds == [2, 3]  # Only 1 is removed explicitly


@pytest.mark.parametrize(
    ("error", "pattern"),
    [
        (errno.ENOSPC, "inotify watch limit reached"),
        (errno.EMFILE, "inotify instance limit reached"),
        (errno.ENOENT, "No such file or directory"),
        (-1, "error"),
    ],
)
def test_raise_error(error, pattern):
    with patch.object(ctypes, "get_errno", new=lambda: error), pytest.raises(OSError, match=pattern) as exc:
        Inotify._raise_error()  # noqa: SLF001
    assert exc.value.errno == error


def test_non_ascii_path(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
    """
    Inotify can construct an event for a path containing non-ASCII.
    """
    path = p("\N{SNOWMAN}")
    start_watching(path=p(""))
    os.mkdir(path)
    event, _ = event_queue.get(timeout=5)
    assert isinstance(event.src_path, str)
    assert event.src_path == path
    # Just make sure it doesn't raise an exception.
    assert repr(event)


def test_watch_file(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
    path = p("this_is_a_file")
    with open(path, "a"):
        pass
    start_watching(path=path)
    os.remove(path)
    event, _ = event_queue.get(timeout=5)
    assert repr(event)


def test_event_equality(p: P) -> None:
    wd_parent_dir = 42
    filename = "file.ext"
    full_path = p(filename)
    event1 = InotifyEvent(wd_parent_dir, InotifyConstants.IN_CREATE, 0, filename, full_path)
    event2 = InotifyEvent(wd_parent_dir, InotifyConstants.IN_CREATE, 0, filename, full_path)
    event3 = InotifyEvent(wd_parent_dir, InotifyConstants.IN_ACCESS, 0, filename, full_path)
    assert event1 == event2
    assert event1 != event3
    assert event2 != event3


def test_select_fd(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None:
    # We open a file 2048 times to ensure that we exhaust 1024 file
    # descriptors, the limit of a select() call.
    path = p("new_file")
    with open(path, "a"):
        pass
    with ExitStack() as stack:
        for _i in range(2048):
            stack.enter_context(open(path))

        # Watch this file for deletion (copied from `test_watch_file`)
        path = p("this_is_a_file")
        with open(path, "a"):
            pass
        start_watching(path=path)
        os.remove(path)
        event, _ = event_queue.get(timeout=5)
        assert repr(event)