1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
#!/usr/bin/env python
from time import sleep
import pytest
from circuits import Component, Event
from circuits.net.events import close
from circuits.net.sockets import UDPServer
from circuits.node import Node
pytestmark = pytest.mark.skipif(pytest.PLATFORM == 'win32', reason='Broken on Windows')
class return_value(Event):
success = True
class App(Component):
def return_value(self, event):
print('Hello client!', event.channels)
@pytest.fixture()
def bind(manager, watcher):
server = UDPServer(0).register(manager)
assert watcher.wait('ready', channel='server')
host, port = server.host, server.port
server.fire(close())
assert watcher.wait('closed', channel='server')
server.unregister()
assert watcher.wait('unregistered', channel='server')
return host, port
@pytest.fixture()
def app(manager, watcher, bind):
server = Node(port=bind[1], server_ip=bind[0])
server.register(manager)
server.bind = bind
assert watcher.wait('registered', channel='node')
return server
def test_auto_reconnect(app, watcher, manager):
# add client
client = App().register(manager)
node = Node().register(client)
chan = node.add('client1', *app.bind, reconnect_delay=1, connect_timeout=1)
assert watcher.wait('connected', channel=chan)
watcher.clear()
# close server
app.fire(close(), app.channel)
assert watcher.wait('closed', channel=app.channel)
watcher.clear()
# client gets an unreachable
assert watcher.wait('connect', channel=chan)
assert watcher.wait('unreachable', channel=chan)
watcher.clear()
# start a new server
node2 = Node(port=app.bind[1], server_ip=app.bind[0])
node2.register(manager)
assert watcher.wait('ready', channel=node2.channel)
watcher.clear()
assert watcher.wait('connected', channel=chan)
client.unregister()
def test_server_send_all(app, watcher, manager):
client1 = App().register(manager)
node1 = Node().register(client1)
chan = node1.add('client1', *app.bind)
assert watcher.wait('connected', channel=chan)
client2 = App().register(manager)
node2 = Node().register(client2)
chan = node2.add('client2', *app.bind)
assert watcher.wait('connected', channel=chan)
event = return_value()
app.server.send_all(event)
assert watcher.wait('return_value')
client1.unregister()
client2.unregister()
def test_server_send(app, watcher, manager):
client1 = App().register(manager)
node1 = Node().register(client1)
chan1 = node1.add('client1', *app.bind)
assert watcher.wait('connected', channel=chan1)
client2 = App().register(manager)
node2 = Node().register(client2)
chan2 = node2.add('client2', *app.bind)
assert watcher.wait('connected', channel=chan2)
event = return_value()
app.server.send(event, app.server.get_socks()[0], no_result=True)
assert watcher.wait('return_value')
client1.unregister()
client2.unregister()
def test_server_send_multicast(app, watcher, manager):
client1 = App().register(manager)
node1 = Node().register(client1)
chan1 = node1.add('client1', *app.bind)
assert watcher.wait('connected', channel=chan1)
watcher.clear()
client2 = App().register(manager)
node2 = Node().register(client2)
chan2 = node2.add('client2', *app.bind)
assert watcher.wait('connected', channel=chan2)
watcher.clear()
client3 = App().register(manager)
node3 = Node().register(client3)
chan3 = node3.add('client3', *app.bind)
assert watcher.wait('connected', channel=chan3)
watcher.clear()
event = return_value()
app.server.send_to(event, app.server.get_socks())
assert watcher.wait('return_value')
for _ in range(3):
if watcher.count('return_value') == 3:
break
sleep(1)
assert watcher.count('return_value') == 3
client1.unregister()
client2.unregister()
client3.unregister()
|