File: test_reloader.py

package info (click to toggle)
python-hupper 1.12-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 344 kB
  • sloc: python: 1,668; makefile: 149
file content (123 lines) | stat: -rw-r--r-- 3,108 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os

here = os.path.abspath(os.path.dirname(__file__))


class DummyCallback:
    called = False

    def __call__(self, paths):
        self.called = paths


def make_proxy(monitor_factory, callback, logger):
    from hupper.reloader import FileMonitorProxy

    proxy = FileMonitorProxy(callback, logger)
    proxy.monitor = monitor_factory(proxy.file_changed)
    return proxy


def test_proxy_proxies(logger):
    class DummyMonitor(object):
        started = stopped = joined = False

        def __call__(self, cb, **kw):
            self.cb = cb
            return self

        def start(self):
            self.started = True

        def stop(self):
            self.stopped = True

        def join(self):
            self.joined = True

    cb = DummyCallback()
    monitor = DummyMonitor()
    proxy = make_proxy(monitor, cb, logger)
    assert monitor.cb
    assert not monitor.started and not monitor.stopped and not monitor.joined
    proxy.start()
    assert monitor.started and not monitor.stopped and not monitor.joined
    proxy.stop()
    assert monitor.stopped and monitor.joined


def test_proxy_expands_paths(tmpdir, logger):
    class DummyMonitor(object):
        def __call__(self, cb, **kw):
            self.cb = cb
            self.paths = []
            return self

        def add_path(self, path):
            self.paths.append(path)

    cb = DummyCallback()
    monitor = DummyMonitor()
    proxy = make_proxy(monitor, cb, logger)
    proxy.add_path('foo')
    assert monitor.paths == ['foo']

    tmpdir.join('foo.txt').ensure()
    tmpdir.join('bar.txt').ensure()
    rootdir = tmpdir.strpath
    monitor.paths = []
    proxy.add_path(os.path.join(rootdir, '*.txt'))
    assert sorted(monitor.paths) == [
        os.path.join(rootdir, 'bar.txt'),
        os.path.join(rootdir, 'foo.txt'),
    ]


def test_proxy_tracks_changes(logger):
    class DummyMonitor(object):
        def __call__(self, cb, **kw):
            self.cb = cb
            return self

    cb = DummyCallback()
    monitor = DummyMonitor()
    proxy = make_proxy(monitor, cb, logger)
    monitor.cb('foo.txt')
    assert cb.called == {'foo.txt'}
    out = logger.get_output('info')
    assert out == 'foo.txt changed; reloading ...'
    logger.reset()
    monitor.cb('foo.txt')
    out = logger.get_output('info')
    assert out == ''
    logger.reset()
    cb.called = False
    proxy.clear_changes()
    monitor.cb('foo.txt')
    out = logger.get_output('info')
    assert out == 'foo.txt changed; reloading ...'
    logger.reset()


def test_ignore_files():
    class DummyMonitor(object):
        paths = set()

        def add_path(self, path):
            self.paths.add(path)

    from hupper.reloader import FileMonitorProxy

    cb = DummyCallback()
    proxy = FileMonitorProxy(cb, None, {'/a/*'})
    monitor = proxy.monitor = DummyMonitor()

    path = 'foo.txt'
    assert path not in monitor.paths
    proxy.add_path(path)
    assert path in monitor.paths

    path = '/a/foo.txt'
    assert path not in monitor.paths
    proxy.add_path(path)
    assert path not in monitor.paths