1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
|
import logging
import os
import sys
import warnings
from enum import IntEnum
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator, Callable, Generator, Optional, Set, Tuple, Union
import anyio
from ._rust_notify import RustNotify
from .filters import DefaultFilter
__all__ = 'watch', 'awatch', 'Change', 'FileChange'
logger = logging.getLogger('watchfiles.main')
class Change(IntEnum):
"""
Enum representing the type of change that occurred.
"""
added = 1
"""A new file or directory was added."""
modified = 2
"""A file or directory was modified, can be either a metadata or data change."""
deleted = 3
"""A file or directory was deleted."""
def raw_str(self) -> str:
return self.name
FileChange = Tuple[Change, str]
"""
A tuple representing a file change, first element is a [`Change`][watchfiles.Change] member, second is the path
of the file or directory that changed.
"""
if TYPE_CHECKING:
import asyncio
from typing import Protocol
import trio
AnyEvent = Union[anyio.Event, asyncio.Event, trio.Event]
class AbstractEvent(Protocol):
def is_set(self) -> bool: ...
def watch(
*paths: Union[Path, str],
watch_filter: Optional[Callable[['Change', str], bool]] = DefaultFilter(),
debounce: int = 1_600,
step: int = 50,
stop_event: Optional['AbstractEvent'] = None,
rust_timeout: int = 5_000,
yield_on_timeout: bool = False,
debug: Optional[bool] = None,
raise_interrupt: bool = True,
force_polling: Optional[bool] = None,
poll_delay_ms: int = 300,
recursive: bool = True,
ignore_permission_denied: Optional[bool] = None,
) -> Generator[Set[FileChange], None, None]:
"""
Watch one or more paths and yield a set of changes whenever files change.
The paths watched can be directories or files, directories are watched recursively - changes in subdirectories
are also detected.
#### Force polling
Notify will fall back to file polling if it can't use file system notifications, but we also force Notify
to use polling if the `force_polling` argument is `True`; if `force_polling` is unset (or `None`), we enable
force polling thus:
* if the `WATCHFILES_FORCE_POLLING` environment variable exists and is not empty:
* if the value is `false`, `disable` or `disabled`, force polling is disabled
* otherwise, force polling is enabled
* otherwise, we enable force polling only if we detect we're running on WSL (Windows Subsystem for Linux)
It is also possible to change the poll delay between iterations, it can be changed to maintain a good response time
and an appropiate CPU consumption using the `poll_delay_ms` argument, we change poll delay thus:
* if file polling is enabled and the `WATCHFILES_POLL_DELAY_MS` env var exists and it is numeric, we use that
* otherwise, we use the argument value
Args:
*paths: filesystem paths to watch.
watch_filter: callable used to filter out changes which are not important, you can either use a raw callable
or a [`BaseFilter`][watchfiles.BaseFilter] instance,
defaults to an instance of [`DefaultFilter`][watchfiles.DefaultFilter]. To keep all changes, use `None`.
debounce: maximum time in milliseconds to group changes over before yielding them.
step: time to wait for new changes in milliseconds, if no changes are detected in this time, and
at least one change has been detected, the changes are yielded.
stop_event: event to stop watching, if this is set, the generator will stop iteration,
this can be anything with an `is_set()` method which returns a bool, e.g. `threading.Event()`.
rust_timeout: maximum time in milliseconds to wait in the rust code for changes, `0` means no timeout.
yield_on_timeout: if `True`, the generator will yield upon timeout in rust even if no changes are detected.
debug: whether to print information about all filesystem changes in rust to stdout, if `None` will use the
`WATCHFILES_DEBUG` environment variable.
raise_interrupt: whether to re-raise `KeyboardInterrupt`s, or suppress the error and just stop iterating.
force_polling: See [Force polling](#force-polling) above.
poll_delay_ms: delay between polling for changes, only used if `force_polling=True`.
recursive: if `True`, watch for changes in sub-directories recursively, otherwise watch only for changes in the
top-level directory, default is `True`.
ignore_permission_denied: if `True`, will ignore permission denied errors, otherwise will raise them by default.
Setting the `WATCHFILES_IGNORE_PERMISSION_DENIED` environment variable will set this value too.
Yields:
The generator yields sets of [`FileChange`][watchfiles.main.FileChange]s.
```py title="Example of watch usage"
from watchfiles import watch
for changes in watch('./first/dir', './second/dir', raise_interrupt=False):
print(changes)
```
"""
force_polling = _default_force_polling(force_polling)
poll_delay_ms = _default_poll_delay_ms(poll_delay_ms)
ignore_permission_denied = _default_ignore_permission_denied(ignore_permission_denied)
debug = _default_debug(debug)
with RustNotify(
[str(p) for p in paths], debug, force_polling, poll_delay_ms, recursive, ignore_permission_denied
) as watcher:
while True:
raw_changes = watcher.watch(debounce, step, rust_timeout, stop_event)
if raw_changes == 'timeout':
if yield_on_timeout:
yield set()
else:
logger.debug('rust notify timeout, continuing')
elif raw_changes == 'signal':
if raise_interrupt:
raise KeyboardInterrupt
else:
logger.warning('KeyboardInterrupt caught, stopping watch')
return
elif raw_changes == 'stop':
return
else:
changes = _prep_changes(raw_changes, watch_filter)
if changes:
_log_changes(changes)
yield changes
else:
logger.debug('all changes filtered out, raw_changes=%s', raw_changes)
async def awatch( # C901
*paths: Union[Path, str],
watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(),
debounce: int = 1_600,
step: int = 50,
stop_event: Optional['AnyEvent'] = None,
rust_timeout: Optional[int] = None,
yield_on_timeout: bool = False,
debug: Optional[bool] = None,
raise_interrupt: Optional[bool] = None,
force_polling: Optional[bool] = None,
poll_delay_ms: int = 300,
recursive: bool = True,
ignore_permission_denied: Optional[bool] = None,
) -> AsyncGenerator[Set[FileChange], None]:
"""
Asynchronous equivalent of [`watch`][watchfiles.watch] using threads to wait for changes.
Arguments match those of [`watch`][watchfiles.watch] except `stop_event`.
All async methods use [anyio](https://anyio.readthedocs.io/en/latest/) to run the event loop.
Unlike [`watch`][watchfiles.watch] `KeyboardInterrupt` cannot be suppressed by `awatch` so they need to be caught
where `asyncio.run` or equivalent is called.
Args:
*paths: filesystem paths to watch.
watch_filter: matches the same argument of [`watch`][watchfiles.watch].
debounce: matches the same argument of [`watch`][watchfiles.watch].
step: matches the same argument of [`watch`][watchfiles.watch].
stop_event: `anyio.Event` which can be used to stop iteration, see example below.
rust_timeout: matches the same argument of [`watch`][watchfiles.watch], except that `None` means
use `1_000` on Windows and `5_000` on other platforms thus helping with exiting on `Ctrl+C` on Windows,
see [#110](https://github.com/samuelcolvin/watchfiles/issues/110).
yield_on_timeout: matches the same argument of [`watch`][watchfiles.watch].
debug: matches the same argument of [`watch`][watchfiles.watch].
raise_interrupt: This is deprecated, `KeyboardInterrupt` will cause this coroutine to be cancelled and then
be raised by the top level `asyncio.run` call or equivalent, and should be caught there.
See [#136](https://github.com/samuelcolvin/watchfiles/issues/136)
force_polling: if true, always use polling instead of file system notifications, default is `None` where
`force_polling` is set to `True` if the `WATCHFILES_FORCE_POLLING` environment variable exists.
poll_delay_ms: delay between polling for changes, only used if `force_polling=True`.
`poll_delay_ms` can be changed via the `WATCHFILES_POLL_DELAY_MS` environment variable.
recursive: if `True`, watch for changes in sub-directories recursively, otherwise watch only for changes in the
top-level directory, default is `True`.
ignore_permission_denied: if `True`, will ignore permission denied errors, otherwise will raise them by default.
Setting the `WATCHFILES_IGNORE_PERMISSION_DENIED` environment variable will set this value too.
Yields:
The generator yields sets of [`FileChange`][watchfiles.main.FileChange]s.
```py title="Example of awatch usage"
import asyncio
from watchfiles import awatch
async def main():
async for changes in awatch('./first/dir', './second/dir'):
print(changes)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print('stopped via KeyboardInterrupt')
```
```py title="Example of awatch usage with a stop event"
import asyncio
from watchfiles import awatch
async def main():
stop_event = asyncio.Event()
async def stop_soon():
await asyncio.sleep(3)
stop_event.set()
stop_soon_task = asyncio.create_task(stop_soon())
async for changes in awatch('/path/to/dir', stop_event=stop_event):
print(changes)
# cleanup by awaiting the (now complete) stop_soon_task
await stop_soon_task
asyncio.run(main())
```
"""
if raise_interrupt is not None:
warnings.warn(
'raise_interrupt is deprecated, KeyboardInterrupt will cause this coroutine to be cancelled and then '
'be raised by the top level asyncio.run call or equivalent, and should be caught there. See #136.',
DeprecationWarning,
)
if stop_event is None:
stop_event_: AnyEvent = anyio.Event()
else:
stop_event_ = stop_event
force_polling = _default_force_polling(force_polling)
poll_delay_ms = _default_poll_delay_ms(poll_delay_ms)
ignore_permission_denied = _default_ignore_permission_denied(ignore_permission_denied)
debug = _default_debug(debug)
with RustNotify(
[str(p) for p in paths], debug, force_polling, poll_delay_ms, recursive, ignore_permission_denied
) as watcher:
timeout = _calc_async_timeout(rust_timeout)
CancelledError = anyio.get_cancelled_exc_class()
while True:
async with anyio.create_task_group() as tg:
try:
raw_changes = await anyio.to_thread.run_sync(watcher.watch, debounce, step, timeout, stop_event_)
except (CancelledError, KeyboardInterrupt):
stop_event_.set()
# suppressing KeyboardInterrupt wouldn't stop it getting raised by the top level asyncio.run call
raise
tg.cancel_scope.cancel()
if raw_changes == 'timeout':
if yield_on_timeout:
yield set()
else:
logger.debug('rust notify timeout, continuing')
elif raw_changes == 'stop':
return
elif raw_changes == 'signal':
# in theory the watch thread should never get a signal
raise RuntimeError('watch thread unexpectedly received a signal')
else:
changes = _prep_changes(raw_changes, watch_filter)
if changes:
_log_changes(changes)
yield changes
else:
logger.debug('all changes filtered out, raw_changes=%s', raw_changes)
def _prep_changes(
raw_changes: Set[Tuple[int, str]], watch_filter: Optional[Callable[[Change, str], bool]]
) -> Set[FileChange]:
# if we wanted to be really snazzy, we could move this into rust
changes = {(Change(change), path) for change, path in raw_changes}
if watch_filter:
changes = {c for c in changes if watch_filter(c[0], c[1])}
return changes
def _log_changes(changes: Set[FileChange]) -> None:
if logger.isEnabledFor(logging.INFO): # pragma: no branch
count = len(changes)
plural = '' if count == 1 else 's'
if logger.isEnabledFor(logging.DEBUG):
logger.debug('%d change%s detected: %s', count, plural, changes)
else:
logger.info('%d change%s detected', count, plural)
def _calc_async_timeout(timeout: Optional[int]) -> int:
"""
see https://github.com/samuelcolvin/watchfiles/issues/110
"""
if timeout is None:
if sys.platform == 'win32':
return 1_000
else:
return 5_000
else:
return timeout
def _default_force_polling(force_polling: Optional[bool]) -> bool:
"""
See docstring for `watch` above for details.
See samuelcolvin/watchfiles#167 and samuelcolvin/watchfiles#187 for discussion and rationale.
"""
if force_polling is not None:
return force_polling
env_var = os.getenv('WATCHFILES_FORCE_POLLING')
if env_var:
return env_var.lower() not in {'false', 'disable', 'disabled'}
else:
return _auto_force_polling()
def _default_poll_delay_ms(poll_delay_ms: int) -> int:
"""
See docstring for `watch` above for details.
"""
env_var = os.getenv('WATCHFILES_POLL_DELAY_MS')
if env_var and env_var.isdecimal():
return int(env_var)
else:
return poll_delay_ms
def _default_debug(debug: Optional[bool]) -> bool:
if debug is not None:
return debug
env_var = os.getenv('WATCHFILES_DEBUG')
return bool(env_var)
def _auto_force_polling() -> bool:
"""
Whether to auto-enable force polling, it should be enabled automatically only on WSL.
See samuelcolvin/watchfiles#187 for discussion.
"""
import platform
uname = platform.uname()
return 'microsoft-standard' in uname.release.lower() and uname.system.lower() == 'linux'
def _default_ignore_permission_denied(ignore_permission_denied: Optional[bool]) -> bool:
if ignore_permission_denied is not None:
return ignore_permission_denied
env_var = os.getenv('WATCHFILES_IGNORE_PERMISSION_DENIED')
return bool(env_var)
|