1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
|
#!/usr/bin/env python
from collections import defaultdict
from circuits import Component, Event
from circuits.protocols import Line
class read(Event):
"""read Event"""
class App(Component):
lines = []
def line(self, line):
self.lines.append(line)
class AppServer(Component):
channel = 'server'
lines = []
def line(self, sock, line):
self.lines.append((sock, line))
def test():
app = App()
Line().register(app)
while len(app):
app.flush()
app.fire(read(b'1\n2\r\n3\n4'))
while len(app):
app.flush()
assert app.lines[0] == b'1'
assert app.lines[1] == b'2'
assert app.lines[2] == b'3'
def test_server():
app = AppServer()
buffers = defaultdict(bytes)
Line(
getBuffer=buffers.__getitem__,
updateBuffer=buffers.__setitem__,
).register(app)
while len(app):
app.flush()
app.fire(read(1, b'1\n2\r\n3\n4'))
app.fire(read(2, b'1\n2\r\n3\n4'))
while len(app):
app.flush()
assert app.lines[0] == (1, b'1')
assert app.lines[1] == (1, b'2')
assert app.lines[2] == (1, b'3')
assert app.lines[3] == (2, b'1')
assert app.lines[4] == (2, b'2')
assert app.lines[5] == (2, b'3')
|