1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
import os
here = os.path.abspath(os.path.dirname(__file__))
class DummyCallback:
called = False
def __call__(self, paths):
self.called = paths
def make_proxy(monitor_factory, callback, logger):
from hupper.reloader import FileMonitorProxy
proxy = FileMonitorProxy(callback, logger)
proxy.monitor = monitor_factory(proxy.file_changed)
return proxy
def test_proxy_proxies(logger):
class DummyMonitor(object):
started = stopped = joined = False
def __call__(self, cb, **kw):
self.cb = cb
return self
def start(self):
self.started = True
def stop(self):
self.stopped = True
def join(self):
self.joined = True
cb = DummyCallback()
monitor = DummyMonitor()
proxy = make_proxy(monitor, cb, logger)
assert monitor.cb
assert not monitor.started and not monitor.stopped and not monitor.joined
proxy.start()
assert monitor.started and not monitor.stopped and not monitor.joined
proxy.stop()
assert monitor.stopped and monitor.joined
def test_proxy_expands_paths(tmpdir, logger):
class DummyMonitor(object):
def __call__(self, cb, **kw):
self.cb = cb
self.paths = []
return self
def add_path(self, path):
self.paths.append(path)
cb = DummyCallback()
monitor = DummyMonitor()
proxy = make_proxy(monitor, cb, logger)
proxy.add_path('foo')
assert monitor.paths == ['foo']
tmpdir.join('foo.txt').ensure()
tmpdir.join('bar.txt').ensure()
rootdir = tmpdir.strpath
monitor.paths = []
proxy.add_path(os.path.join(rootdir, '*.txt'))
assert sorted(monitor.paths) == [
os.path.join(rootdir, 'bar.txt'),
os.path.join(rootdir, 'foo.txt'),
]
def test_proxy_tracks_changes(logger):
class DummyMonitor(object):
def __call__(self, cb, **kw):
self.cb = cb
return self
cb = DummyCallback()
monitor = DummyMonitor()
proxy = make_proxy(monitor, cb, logger)
monitor.cb('foo.txt')
assert cb.called == {'foo.txt'}
out = logger.get_output('info')
assert out == 'foo.txt changed; reloading ...'
logger.reset()
monitor.cb('foo.txt')
out = logger.get_output('info')
assert out == ''
logger.reset()
cb.called = False
proxy.clear_changes()
monitor.cb('foo.txt')
out = logger.get_output('info')
assert out == 'foo.txt changed; reloading ...'
logger.reset()
def test_ignore_files():
class DummyMonitor(object):
paths = set()
def add_path(self, path):
self.paths.add(path)
from hupper.reloader import FileMonitorProxy
cb = DummyCallback()
proxy = FileMonitorProxy(cb, None, {'/a/*'})
monitor = proxy.monitor = DummyMonitor()
path = 'foo.txt'
assert path not in monitor.paths
proxy.add_path(path)
assert path in monitor.paths
path = '/a/foo.txt'
assert path not in monitor.paths
proxy.add_path(path)
assert path not in monitor.paths
|