1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
#!/usr/bin/env python
import os
import pytest
from circuits import Component, handler
try:
from circuits.io.notify import Notify
except ImportError:
pytest.importorskip('pyinotify')
class App(Component):
def init(self, *args, **kwargs):
self.created_status = False
@handler('created', channel='notify')
def created(self, *args, **kwargs):
self.created_status = True
class Creator:
def __init__(self, app, watcher, tmpdir, timeout=0.5):
self.app = app
self.watcher = watcher
self.tmpdir = tmpdir
self.timeout = timeout
def create(self, *targets, **kwargs):
assert_created = kwargs.get('assert_created', True)
target = os.path.join(*targets)
self.tmpdir.ensure(target, dir=kwargs.get('dir', False))
self.watcher.wait('created', timeout=self.timeout)
assert self.app.created_status == assert_created
# Reset for next call
self.watcher.clear()
self.app.created_status = False
@pytest.fixture()
def app(manager, watcher):
app = App().register(manager)
yield app
# Unregister application on test end
app.unregister()
watcher.wait('unregistered')
@pytest.fixture()
def notify(app, watcher):
notify = Notify().register(app)
watcher.wait('registered')
return notify
@pytest.fixture()
def creator(app, watcher, tmpdir):
return Creator(app, watcher, tmpdir)
# TESTS
def test_notify_file(notify, tmpdir, creator):
# Add a path to the watch
notify.add_path(str(tmpdir))
# Test creation and detection of a file
creator.create('helloworld.txt')
# Remove the path from the watch
notify.remove_path(str(tmpdir))
# Test creation and NON detection of a file
creator.create('helloworld2.txt', assert_created=False)
def test_notify_dir(notify, tmpdir, creator):
# Add a path to the watch
notify.add_path(str(tmpdir))
# Test creation and detection of a file
creator.create('hellodir', dir=True)
# Remove the path from the watch
notify.remove_path(str(tmpdir))
# Test creation and NON detection of a file
creator.create('hellodir2', dir=True, assert_created=False)
def test_notify_subdir_recursive(notify, tmpdir, creator):
# Add a subdir
subdir = 'sub'
tmpdir.ensure(subdir, dir=True)
# Add a path to the watch
notify.add_path(str(tmpdir), recursive=True)
# Test creation and detection of a file in subdir
creator.create(subdir, 'helloworld.txt', assert_created=True)
@pytest.mark.xfail(reason='pyinotify issue #133')
def test_notify_subdir_recursive_remove_path(notify, tmpdir, creator):
# This is logically the second part of the above test,
# but pyinotify fails on rm_watch(...., rec=True)
# Add a subdir
subdir = 'sub'
tmpdir.ensure(subdir, dir=True)
# Add a path to the watch
notify.add_path(str(tmpdir), recursive=True)
# Remove the path from the watch
notify.remove_path(str(tmpdir), recursive=True)
# Test creation and NON detection of a file in subdir
creator.create(subdir, 'helloworld2.txt', assert_created=False)
def test_notify_subdir_recursive_auto_add(notify, tmpdir, creator):
# Add a path to the watch
notify.add_path(str(tmpdir), recursive=True)
# Create/detect subdirectory
subdir = 'sub'
creator.create(subdir, dir=True, assert_created=True)
# Create/detect file in subdirectory
creator.create(subdir, 'helloworld.txt', assert_created=True)
# Skip notify.remove_path() because pyinotify is broken
def test_notify_subdir_recursive_no_auto_add(notify, tmpdir, creator):
# Add a path to the watch
notify.add_path(str(tmpdir), recursive=True, auto_add=False)
# Create/detect subdirectory
subdir = 'sub'
creator.create(subdir, dir=True, assert_created=True)
# Create, not detect file in subdirectory
creator.create(subdir, 'helloworld.txt', assert_created=False)
# Skip notify.remove_path() because pyinotify is broken
|