File: test_core.py

package info (click to toggle)
obsub 0.2.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 160 kB
  • sloc: python: 351; makefile: 5; sh: 2
file content (134 lines) | stat: -rw-r--r-- 4,051 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
Test the core functionality.

Contains a converted version of all of the doctests.

"""
# test utilities
import unittest
import weakref, gc

# tested module
from obsub import event


class NewStyle(object):
    def __init__(self):
        self.count = 0
    @event
    def emit(self, first, second):
        self.count += 1

class OldStyle:
    def __init__(self):
        self.count = 0
    @event
    def emit(self, first, second):
        self.count += 1

class Observer(object):
    def __init__(self, call_stack):
        self.call_stack = call_stack
    def __call__(self, source, first, second):
        diff = (sum(1 for call in self.call_stack if call[2] == source)
                - source.count) + 1
        self.call_stack.append((self, diff, source, first, second))

class TestCore(unittest.TestCase):
    """Test the obsub module for new style classes."""
    cls = NewStyle

    def setUp(self):
        self.call_stack = []
        self.maxDiff = None

    def observer(self, instance):
        observer = Observer(self.call_stack)
        instance.emit += observer
        return observer

    def check_stack(self, expected):
        self.assertEqual(expected, self.call_stack)

    def test_single_handler(self):
        """A single handler is invoked correctly."""
        src = self.cls()
        obs = self.observer(src)
        src.emit("Hello", "World")
        self.check_stack([(obs, 0, src, "Hello", "World")])

    def test_remove_handler(self):
        """Removal of event handlers works correctly."""
        src = self.cls()
        obs = self.observer(src)
        src.emit -= obs
        src.emit("something", "arbitrary")
        self.check_stack([])

    def test_multiple_handlers(self):
        """Multiple handlers are invoked in correct order."""
        src = self.cls()
        obs = [self.observer(src) for i in range(10)]
        src.emit("Hello", "World")
        self.check_stack([(obs[i], i, src, "Hello", "World")
                          for i in range(10)])

    def test_multiple_sources(self):
        """Multiple instances of the event source class can coexist."""
        src = [self.cls() for i in range(10)]
        obs = [self.observer(s) for s in src]
        for s in src:
            s.emit("Hello", "World")
        self.check_stack([(obs[i], 0, src[i], "Hello", "World")
                          for i in range(10)])

    def test_keyword_arguments(self):
        """Keyword arguments can be used."""
        src = self.cls()
        obs = self.observer(src)
        src.emit(second="World", first="Hello")
        self.check_stack([(obs, 0, src, "Hello", "World")])

    def test_wrong_signature(self):
        """Any incorrect call signature raises a TypeError."""
        src = self.cls()
        self.assertRaises(TypeError, src.emit, "Hello")
        self.assertRaises(TypeError, src.emit, first="Hello", third="!")
        self.assertRaises(TypeError, src.emit, second="World")
        obs = self.observer(src)
        self.assertRaises(TypeError, src.emit, forth=":)")
        self.check_stack([])

    def test_class_based_access(self):
        """The emitter function can be invoked via its class."""
        src = self.cls()
        obs = self.observer(src)
        self.cls.emit(src, "Hello", "World")
        self.check_stack([(obs, 0, src, "Hello", "World")])

    def test_keep_alive(self):
        """A reference to a bound event method keeps its owner instance alive."""
        src = self.cls()
        obs = self.observer(src)
        emit = src.emit

        # remove original reference:
        wr = weakref.ref(src)
        del src
        gc.collect()

        # the emitter function is still functioning:
        assert wr() is not None
        emit("Hello", "World")
        self.check_stack([(obs, 0, wr(), "Hello", "World")])

        # removing the bound method makes the instance disappear:
        del emit
        self.call_stack[0] = None
        gc.collect()
        assert wr() is None

# ERRORS:
class TestCoreOldStyle(TestCore):
    cls = OldStyle