1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
"""
Test the core functionality.
Contains a converted version of all of the doctests.
"""
# test utilities
import unittest
import weakref, gc
# tested module
from obsub import event
class NewStyle(object):
def __init__(self):
self.count = 0
@event
def emit(self, first, second):
self.count += 1
class OldStyle:
def __init__(self):
self.count = 0
@event
def emit(self, first, second):
self.count += 1
class Observer(object):
def __init__(self, call_stack):
self.call_stack = call_stack
def __call__(self, source, first, second):
diff = (sum(1 for call in self.call_stack if call[2] == source)
- source.count) + 1
self.call_stack.append((self, diff, source, first, second))
class TestCore(unittest.TestCase):
"""Test the obsub module for new style classes."""
cls = NewStyle
def setUp(self):
self.call_stack = []
self.maxDiff = None
def observer(self, instance):
observer = Observer(self.call_stack)
instance.emit += observer
return observer
def check_stack(self, expected):
self.assertEqual(expected, self.call_stack)
def test_single_handler(self):
"""A single handler is invoked correctly."""
src = self.cls()
obs = self.observer(src)
src.emit("Hello", "World")
self.check_stack([(obs, 0, src, "Hello", "World")])
def test_remove_handler(self):
"""Removal of event handlers works correctly."""
src = self.cls()
obs = self.observer(src)
src.emit -= obs
src.emit("something", "arbitrary")
self.check_stack([])
def test_multiple_handlers(self):
"""Multiple handlers are invoked in correct order."""
src = self.cls()
obs = [self.observer(src) for i in range(10)]
src.emit("Hello", "World")
self.check_stack([(obs[i], i, src, "Hello", "World")
for i in range(10)])
def test_multiple_sources(self):
"""Multiple instances of the event source class can coexist."""
src = [self.cls() for i in range(10)]
obs = [self.observer(s) for s in src]
for s in src:
s.emit("Hello", "World")
self.check_stack([(obs[i], 0, src[i], "Hello", "World")
for i in range(10)])
def test_keyword_arguments(self):
"""Keyword arguments can be used."""
src = self.cls()
obs = self.observer(src)
src.emit(second="World", first="Hello")
self.check_stack([(obs, 0, src, "Hello", "World")])
def test_wrong_signature(self):
"""Any incorrect call signature raises a TypeError."""
src = self.cls()
self.assertRaises(TypeError, src.emit, "Hello")
self.assertRaises(TypeError, src.emit, first="Hello", third="!")
self.assertRaises(TypeError, src.emit, second="World")
obs = self.observer(src)
self.assertRaises(TypeError, src.emit, forth=":)")
self.check_stack([])
def test_class_based_access(self):
"""The emitter function can be invoked via its class."""
src = self.cls()
obs = self.observer(src)
self.cls.emit(src, "Hello", "World")
self.check_stack([(obs, 0, src, "Hello", "World")])
def test_keep_alive(self):
"""A reference to a bound event method keeps its owner instance alive."""
src = self.cls()
obs = self.observer(src)
emit = src.emit
# remove original reference:
wr = weakref.ref(src)
del src
gc.collect()
# the emitter function is still functioning:
assert wr() is not None
emit("Hello", "World")
self.check_stack([(obs, 0, wr(), "Hello", "World")])
# removing the bound method makes the instance disappear:
del emit
self.call_stack[0] = None
gc.collect()
assert wr() is None
# ERRORS:
class TestCoreOldStyle(TestCore):
cls = OldStyle
|