1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
|
"""
fs.tests.test_watch: testcases for change watcher support
"""
import os
import sys
import time
import gc
import pickle
import unittest
from fs.path import *
from fs.errors import *
from fs.watch import *
from fs.tests import FSTestCases
try:
from fs.osfs import watch_inotify
except ImportError:
watch_inotify = None
if sys.platform == "win32":
try:
from fs.osfs import watch_win32
except ImportError:
watch_win32 = None
else:
watch_win32 = None
import logging
logging.getLogger('pyinotify').setLevel(logging.ERROR)
import six
from six import PY3, b
class WatcherTestCases:
"""Testcases for filesystems providing change watcher support.
This class should be used as a mixin to the unittest.TestCase class
for filesystems that provide change watcher support.
"""
def setupWatchers(self):
self._captured_events = []
self.watchfs.add_watcher(self._captured_events.append)
def clearCapturedEvents(self):
del self._captured_events[:]
def waitForEvents(self):
if isinstance(self.watchfs,PollingWatchableFS):
self.watchfs._poll_cond.acquire()
self.watchfs._poll_cond.wait()
self.watchfs._poll_cond.wait()
self.watchfs._poll_cond.release()
else:
time.sleep(2)
def assertEventOccurred(self,cls,path=None,event_list=None,**attrs):
if not self.checkEventOccurred(cls,path,event_list,**attrs):
args = (cls.__name__,path,attrs)
assert False, "Event did not occur: %s(%s,%s)" % args
def checkEventOccurred(self,cls,path=None,event_list=None,**attrs):
if event_list is None:
event_list = self._captured_events
self.waitForEvents()
for event in event_list:
if isinstance(event,cls):
if path is None or event.path == path:
for (k,v) in attrs.iteritems():
if getattr(event,k) != v:
break
else:
# all attrs match - found it!
return True
return False
def test_watch_makedir(self):
self.setupWatchers()
self.fs.makedir("test1")
self.assertEventOccurred(CREATED,"/test1")
def test_watch_makedir_with_two_watchers(self):
self.setupWatchers()
events2 = []
self.watchfs.add_watcher(events2.append)
self.fs.makedir("test1")
self.assertEventOccurred(CREATED,"/test1")
self.assertEventOccurred(CREATED,"/test1",event_list=events2)
def test_watch_readfile(self):
self.setupWatchers()
self.fs.setcontents("hello", b("hello world"))
self.assertEventOccurred(CREATED,"/hello")
self.clearCapturedEvents()
old_atime = self.fs.getinfo("hello").get("accessed_time")
self.assertEquals(self.fs.getcontents("hello"), b("hello world"))
if not isinstance(self.watchfs,PollingWatchableFS):
# Help it along by updting the atime.
# TODO: why is this necessary?
if self.fs.hassyspath("hello"):
syspath = self.fs.getsyspath("hello")
mtime = os.stat(syspath).st_mtime
atime = int(time.time())
os.utime(self.fs.getsyspath("hello"),(atime,mtime))
self.assertEventOccurred(ACCESSED,"/hello")
elif old_atime is not None:
# Some filesystems don't update atime synchronously, or only
# update it if it's too old, or don't update it at all!
# Try to force the issue, wait for it to change, but eventually
# give up and bail out.
for i in xrange(10):
if self.fs.getinfo("hello").get("accessed_time") != old_atime:
if not self.checkEventOccurred(MODIFIED,"/hello"):
self.assertEventOccurred(ACCESSED,"/hello")
break
time.sleep(0.2)
if self.fs.hassyspath("hello"):
syspath = self.fs.getsyspath("hello")
mtime = os.stat(syspath).st_mtime
atime = int(time.time())
os.utime(self.fs.getsyspath("hello"),(atime,mtime))
def test_watch_writefile(self):
self.setupWatchers()
self.fs.setcontents("hello", b("hello world"))
self.assertEventOccurred(CREATED,"/hello")
self.clearCapturedEvents()
self.fs.setcontents("hello", b("hello again world"))
self.assertEventOccurred(MODIFIED,"/hello")
def test_watch_single_file(self):
self.fs.setcontents("hello", b("hello world"))
events = []
self.watchfs.add_watcher(events.append,"/hello",(MODIFIED,))
self.fs.setcontents("hello", b("hello again world"))
self.fs.remove("hello")
self.waitForEvents()
for evt in events:
assert isinstance(evt,MODIFIED)
self.assertEquals(evt.path,"/hello")
def test_watch_single_file_remove(self):
self.fs.makedir("testing")
self.fs.setcontents("testing/hello", b("hello world"))
events = []
self.watchfs.add_watcher(events.append,"/testing/hello",(REMOVED,))
self.fs.setcontents("testing/hello", b("hello again world"))
self.waitForEvents()
self.fs.remove("testing/hello")
self.waitForEvents()
self.assertEquals(len(events),1)
assert isinstance(events[0],REMOVED)
self.assertEquals(events[0].path,"/testing/hello")
def test_watch_iter_changes(self):
changes = iter_changes(self.watchfs)
self.fs.makedir("test1")
self.fs.setcontents("test1/hello", b("hello world"))
self.waitForEvents()
self.fs.removedir("test1",force=True)
self.waitForEvents()
self.watchfs.close()
# Locate the CREATED(test1) event
event = changes.next(timeout=1)
while not isinstance(event,CREATED) or event.path != "/test1":
event = changes.next(timeout=1)
# Locate the CREATED(test1/hello) event
event = changes.next(timeout=1)
while not isinstance(event,CREATED) or event.path != "/test1/hello":
event = changes.next(timeout=1)
# Locate the REMOVED(test1) event
event = changes.next(timeout=1)
while not isinstance(event,REMOVED) or event.path != "/test1":
event = changes.next(timeout=1)
# Locate the CLOSED event
event = changes.next(timeout=1)
while not isinstance(event,CLOSED):
event = changes.next(timeout=1)
# That should be the last event in the list
self.assertRaises(StopIteration,getattr(changes, "next"),timeout=1)
changes.close()
from fs import tempfs, osfs
class TestWatchers_TempFS(unittest.TestCase,FSTestCases,WatcherTestCases):
def setUp(self):
self.fs = tempfs.TempFS()
watchfs = osfs.OSFS(self.fs.root_path)
self.watchfs = ensure_watchable(watchfs,poll_interval=0.1)
if watch_inotify is not None:
self.assertEquals(watchfs,self.watchfs)
if watch_win32 is not None:
self.assertEquals(watchfs,self.watchfs)
def tearDown(self):
self.watchfs.close()
self.fs.close()
def check(self, p):
return self.fs.exists(p)
from fs import memoryfs
class TestWatchers_MemoryFS(unittest.TestCase,FSTestCases,WatcherTestCases):
def setUp(self):
self.fs = self.watchfs = WatchableFS(memoryfs.MemoryFS())
def tearDown(self):
self.watchfs.close()
self.fs.close()
def check(self, p):
return self.fs.exists(p)
class TestWatchers_MemoryFS_polling(TestWatchers_MemoryFS):
def setUp(self):
self.fs = memoryfs.MemoryFS()
self.watchfs = ensure_watchable(self.fs,poll_interval=0.1)
|