File: amptube.py

package info (click to toggle)
python-tubes 0.2.1-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 740 kB
  • sloc: python: 3,215; makefile: 149
file content (142 lines) | stat: -rw-r--r-- 3,533 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from zope.interface import implementer

from twisted.internet.endpoints import serverFromString, clientFromString

from twisted.internet.defer import Deferred, inlineCallbacks

from twisted.internet.protocol import Factory
from twisted.protocols.amp import AmpBox, Command, IBoxSender, Integer, AMP, CommandLocator, BoxDispatcher

from tubes.protocol import flowFountFromEndpoint
from tubes.listening import Listener
from tubes.itube import ISegment
from tubes.tube import tube, series
from tubes.framing import bytesToIntPrefixed

@tube
class StringsToBoxes:

    inputType = None            # I... Packet? IString? IDatagram?
    outputType = None           # AmpBox -> TODO, implement classes.

    state = 'new'

    def received(self, item):
        return getattr(self, 'received_' + self.state)(item)


    def received_new(self, item):
        self._currentBox = AmpBox()
        return self.received_key(item)


    def received_key(self, item):
        if item:
            self._currentKey = item
            self.state = 'value'
        else:
            self.state = 'new'
            yield self._currentBox


    def received_value(self, item):
        self._currentBox[self._currentKey] = item
        self.state = 'key'



@tube
class BoxesToData:
    """
    Shortcut: I want to go from boxes directly to data.
    """
    inputType = None            # AmpBox
    outputType = ISegment

    def received(self, item):
        yield item.serialize()


@implementer(IBoxSender)
class BufferingBoxSender(object):
    def __init__(self):
        self.boxesToSend = []

    def sendBox(self, box):
        self.boxesToSend.append(box)

    def unhandledError(failure):
        from twisted.python import log
        log.err(failure)


@tube
class BoxConsumer:

    inputType = None            # AmpBox
    outputType = None           # AmpBox

    def __init__(self, boxReceiver):
        self.boxReceiver = boxReceiver
        self.bbs = BufferingBoxSender()


    def started(self):
        self.boxReceiver.startReceivingBoxes(self.bbs)


    def unhandledError(self, failure):
        failure.printTraceback()


    def received(self, box):
        self.boxReceiver.ampBoxReceived(box)
        boxes = self.bbs.boxesToSend
        self.bbs.boxesToSend = []
        return boxes



class Add(Command):
    arguments = [(b'a', Integer()),
                 (b'b', Integer())]
    response = [(b'result', Integer())]


class Math(CommandLocator):
    @Add.responder
    def add(self, a, b):
        return dict(result=a + b)


def mathFlow(flow):
    byteParser = bytesToIntPrefixed(16)
    messageParser = StringsToBoxes()
    applicationCode = Math()
    dispatcher = BoxDispatcher(applicationCode)
    messageConsumer = BoxConsumer(dispatcher)
    messageSerializer = BoxesToData()
    combined = series(
        byteParser, messageParser, messageConsumer, messageSerializer, flow.drain
    )
    flow.fount.flowTo(combined)



@inlineCallbacks
def main(reactor, type="server"):
    if type == "server":
        serverEndpoint = serverFromString(reactor, "tcp:1234")
        flowFount = yield flowFountFromEndpoint(serverEndpoint)
        flowFount.flowTo(Listener(mathFlow))
    else:
        clientEndpoint = clientFromString(reactor, "tcp:localhost:1234")
        amp = yield clientEndpoint.connect(Factory.forProtocol(AMP))
        for x in range(20):
            print((yield amp.callRemote(Add, a=1, b=2)))
    yield Deferred()


from twisted.internet.task import react
from sys import argv
react(main, argv[1:])