1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
# Copyright 2015 Hardcoded Software (http://www.hardcoded.net)
# This software is licensed under the "GPLv3" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.gnu.org/licenses/gpl-3.0.html
from hscommon.testutil import eq_
from hscommon.notify import Broadcaster, Listener, Repeater
class HelloListener(Listener):
def __init__(self, broadcaster):
Listener.__init__(self, broadcaster)
self.hello_count = 0
def hello(self):
self.hello_count += 1
class HelloRepeater(Repeater):
def __init__(self, broadcaster):
Repeater.__init__(self, broadcaster)
self.hello_count = 0
def hello(self):
self.hello_count += 1
def create_pair():
b = Broadcaster()
listener = HelloListener(b)
return b, listener
def test_disconnect_during_notification():
# When a listener disconnects another listener the other listener will not receive a
# notification.
# This whole complication scheme below is because the order of the notification is not
# guaranteed. We could disconnect everything from self.broadcaster.listeners, but this
# member is supposed to be private. Hence, the '.other' scheme
class Disconnecter(Listener):
def __init__(self, broadcaster):
Listener.__init__(self, broadcaster)
self.hello_count = 0
def hello(self):
self.hello_count += 1
self.other.disconnect()
broadcaster = Broadcaster()
first = Disconnecter(broadcaster)
second = Disconnecter(broadcaster)
first.other, second.other = second, first
first.connect()
second.connect()
broadcaster.notify("hello")
# only one of them was notified
eq_(first.hello_count + second.hello_count, 1)
def test_disconnect():
# After a disconnect, the listener doesn't hear anything.
b, listener = create_pair()
listener.connect()
listener.disconnect()
b.notify("hello")
eq_(listener.hello_count, 0)
def test_disconnect_when_not_connected():
# When disconnecting an already disconnected listener, nothing happens.
b, listener = create_pair()
listener.disconnect()
def test_not_connected_on_init():
# A listener is not initialized connected.
b, listener = create_pair()
b.notify("hello")
eq_(listener.hello_count, 0)
def test_notify():
# The listener listens to the broadcaster.
b, listener = create_pair()
listener.connect()
b.notify("hello")
eq_(listener.hello_count, 1)
def test_reconnect():
# It's possible to reconnect a listener after disconnection.
b, listener = create_pair()
listener.connect()
listener.disconnect()
listener.connect()
b.notify("hello")
eq_(listener.hello_count, 1)
def test_repeater():
b = Broadcaster()
r = HelloRepeater(b)
listener = HelloListener(r)
r.connect()
listener.connect()
b.notify("hello")
eq_(r.hello_count, 1)
eq_(listener.hello_count, 1)
def test_repeater_with_repeated_notifications():
# If REPEATED_NOTIFICATIONS is not empty, only notifs in this set are repeated (but they're
# still dispatched locally).
class MyRepeater(HelloRepeater):
REPEATED_NOTIFICATIONS = {"hello"}
def __init__(self, broadcaster):
HelloRepeater.__init__(self, broadcaster)
self.foo_count = 0
def foo(self):
self.foo_count += 1
b = Broadcaster()
r = MyRepeater(b)
listener = HelloListener(r)
r.connect()
listener.connect()
b.notify("hello")
b.notify("foo") # if the repeater repeated this notif, we'd get a crash on HelloListener
eq_(r.hello_count, 1)
eq_(listener.hello_count, 1)
eq_(r.foo_count, 1)
def test_repeater_doesnt_try_to_dispatch_to_self_if_it_cant():
# if a repeater doesn't handle a particular message, it doesn't crash and simply repeats it.
b = Broadcaster()
r = Repeater(b) # doesnt handle hello
listener = HelloListener(r)
r.connect()
listener.connect()
b.notify("hello") # no crash
eq_(listener.hello_count, 1)
def test_bind_messages():
b, listener = create_pair()
listener.bind_messages({"foo", "bar"}, listener.hello)
listener.connect()
b.notify("foo")
b.notify("bar")
b.notify("hello") # Normal dispatching still work
eq_(listener.hello_count, 3)
|