1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
|
# Copyright (c) 2016 The aionotify project
# This code is distributed under the two-clause BSD License.
import asyncio
import logging
import os
import os.path
import tempfile
import unittest
import asynctest
import aionotify
AIODEBUG = bool(os.environ.get('PYTHONAIODEBUG') == '1')
if AIODEBUG:
logger = logging.getLogger('asyncio')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
TESTDIR = os.environ.get('AIOTESTDIR') or os.path.join(os.path.dirname(__file__), 'testevents')
class AIONotifyTestCase(asynctest.TestCase):
forbid_get_event_loop = True
timeout = 3
def setUp(self):
if AIODEBUG:
self.loop.set_debug(True)
self.watcher = aionotify.Watcher()
self._testdir = tempfile.TemporaryDirectory(dir=TESTDIR)
self.testdir = self._testdir.name
# Schedule a loop shutdown
self.loop.call_later(self.timeout, self.loop.stop)
def tearDown(self):
if not self.watcher.closed:
self.watcher.close()
self._testdir.cleanup()
self.assertFalse(os.path.exists(self.testdir))
# Utility functions
# =================
# Those allow for more readable tests.
def _touch(self, filename, *, parent=None):
path = os.path.join(parent or self.testdir, filename)
with open(path, 'w') as f:
f.write('')
def _unlink(self, filename, *, parent=None):
path = os.path.join(parent or self.testdir, filename)
os.unlink(path)
def _rename(self, source, target, *, parent=None):
source_path = os.path.join(parent or self.testdir, source)
target_path = os.path.join(parent or self.testdir, target)
os.rename(source_path, target_path)
def _assert_file_event(self, event, name, flags=aionotify.Flags.CREATE, alias=None):
"""Check for an expected file event.
Allows for more readable tests.
"""
if alias is None:
alias = self.testdir
self.assertEqual(name, event.name)
self.assertEqual(flags, event.flags)
self.assertEqual(alias, event.alias)
async def _assert_no_events(self, timeout=0.1):
"""Ensure that no events are left in the queue."""
task = self.watcher.get_event()
try:
result = await asyncio.wait_for(task, timeout, loop=self.loop)
except asyncio.TimeoutError:
# All fine: we didn't receive any event.
pass
else:
raise AssertionError("Event %r occurred within timeout %s" % (result, timeout))
class SimpleUsageTests(AIONotifyTestCase):
async def test_watch_before_start(self):
"""A watch call is valid before startup."""
self.watcher.watch(self.testdir, aionotify.Flags.CREATE)
await self.watcher.setup(self.loop)
# Touch a file: we get the event.
self._touch('a')
event = await self.watcher.get_event()
self._assert_file_event(event, 'a')
# And it's over.
await self._assert_no_events()
async def test_watch_after_start(self):
"""A watch call is valid after startup."""
await self.watcher.setup(self.loop)
self.watcher.watch(self.testdir, aionotify.Flags.CREATE)
# Touch a file: we get the event.
self._touch('a')
event = await self.watcher.get_event()
self._assert_file_event(event, 'a')
# And it's over.
await self._assert_no_events()
async def test_event_ordering(self):
"""Events should arrive in the order files where created."""
await self.watcher.setup(self.loop)
self.watcher.watch(self.testdir, aionotify.Flags.CREATE)
# Touch 2 files
self._touch('a')
self._touch('b')
# Get the events
event1 = await self.watcher.get_event()
event2 = await self.watcher.get_event()
self._assert_file_event(event1, 'a')
self._assert_file_event(event2, 'b')
# And it's over.
await self._assert_no_events()
async def test_filtering_events(self):
"""We only get targeted events."""
await self.watcher.setup(self.loop)
self.watcher.watch(self.testdir, aionotify.Flags.CREATE)
self._touch('a')
event = await self.watcher.get_event()
self._assert_file_event(event, 'a')
# Perform a filtered-out event; we shouldn't see anything
self._unlink('a')
await self._assert_no_events()
async def test_watch_unwatch(self):
"""Watches can be removed."""
self.watcher.watch(self.testdir, aionotify.Flags.CREATE)
await self.watcher.setup(self.loop)
self.watcher.unwatch(self.testdir)
await asyncio.sleep(0.1)
# Touch a file; we shouldn't see anything.
self._touch('a')
await self._assert_no_events()
async def test_watch_unwatch_before_drain(self):
"""Watches can be removed, no events occur afterwards."""
self.watcher.watch(self.testdir, aionotify.Flags.CREATE)
await self.watcher.setup(self.loop)
# Touch a file before unwatching
self._touch('a')
self.watcher.unwatch(self.testdir)
# We shouldn't see anything.
await self._assert_no_events()
async def test_rename_detection(self):
"""A file rename can be detected through event cookies."""
self.watcher.watch(self.testdir, aionotify.Flags.MOVED_FROM | aionotify.Flags.MOVED_TO)
await self.watcher.setup(self.loop)
self._touch('a')
# Rename a file => two events
self._rename('a', 'b')
event1 = await self.watcher.get_event()
event2 = await self.watcher.get_event()
# We got moved_from then moved_to; they share the same cookie.
self._assert_file_event(event1, 'a', aionotify.Flags.MOVED_FROM)
self._assert_file_event(event2, 'b', aionotify.Flags.MOVED_TO)
self.assertEqual(event1.cookie, event2.cookie)
# And it's over.
await self._assert_no_events()
class ErrorTests(AIONotifyTestCase):
"""Test error cases."""
async def test_watch_nonexistent(self):
"""Watching a non-existent directory raises an OSError."""
badpath = os.path.join(self.testdir, 'nonexistent')
self.watcher.watch(badpath, aionotify.Flags.CREATE)
with self.assertRaises(OSError):
await self.watcher.setup(self.loop)
async def test_unwatch_bad_alias(self):
self.watcher.watch(self.testdir, aionotify.Flags.CREATE)
await self.watcher.setup(self.loop)
with self.assertRaises(ValueError):
self.watcher.unwatch('blah')
class SanityTests(AIONotifyTestCase):
timeout = 0.1
@unittest.expectedFailure
async def test_timeout_works(self):
"""A test cannot run longer than the defined timeout."""
# This test should fail, since we're setting a global timeout of 0.1 yet ask to wait for 0.3 seconds.
await asyncio.sleep(0.5)
|