1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
|
#!/usr/bin/env python
from collections import defaultdict
from circuits.protocols import Line
from circuits import Event, Component
class read(Event):
"""read Event"""
class App(Component):
lines = []
def line(self, line):
self.lines.append(line)
class AppServer(Component):
channel = "server"
lines = []
def line(self, sock, line):
self.lines.append((sock, line))
def test():
app = App()
Line().register(app)
while app:
app.flush()
app.fire(read(b"1\n2\r\n3\n4"))
while app:
app.flush()
assert app.lines[0] == b"1"
assert app.lines[1] == b"2"
assert app.lines[2] == b"3"
def test_server():
app = AppServer()
buffers = defaultdict(bytes)
Line(
getBuffer=buffers.__getitem__,
updateBuffer=buffers.__setitem__
).register(app)
while app:
app.flush()
app.fire(read(1, b"1\n2\r\n3\n4"))
app.fire(read(2, b"1\n2\r\n3\n4"))
while app:
app.flush()
assert app.lines[0] == (1, b"1")
assert app.lines[1] == (1, b"2")
assert app.lines[2] == (1, b"3")
assert app.lines[3] == (2, b"1")
assert app.lines[4] == (2, b"2")
assert app.lines[5] == (2, b"3")
|