File: amprouter.py

package info (click to toggle)
epsilon 0.7.0-2~bpo70%2B1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy-backports
  • size: 636 kB
  • sloc: python: 5,454; makefile: 15; sh: 5
file content (205 lines) | stat: -rw-r--r-- 6,407 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# -*- test-case-name: epsilon.test.test_amprouter -*-
# Copyright (c) 2008 Divmod.  See LICENSE for details.

"""
This module provides an implementation of I{Routes}, a system for multiplexing
multiple L{IBoxReceiver}/I{IBoxSender} pairs over a single L{AMP} connection.
"""

from itertools import count

from zope.interface import implements

from twisted.protocols.amp import IBoxReceiver, IBoxSender

from epsilon.structlike import record

__metaclass__ = type

_ROUTE = '_route'
_unspecified = object()


class RouteNotConnected(Exception):
    """
    An attempt was made to send AMP boxes through a L{Route} which is not yet
    connected to anything.
    """



class Route(record('router receiver localRouteName remoteRouteName',
                   remoteRouteName=_unspecified)):
    """
    Wrap up a route name and a box sender to transparently add the route name
    to boxes sent through this box sender.

    @ivar router: The L{Router} which created this route.  This will be used
        for route tear down and for its L{IBoxSender}, to send boxes.

    @ivar receiver: The receiver which will be started with this object as its
        sender.

    @type localRouteName: C{unicode}
    @ivar localRouteName: The name of this route as known by the other side of
        the AMP connection.  AMP boxes with this route are expected to be
        routed to this object.

    @type remoteRouteName: C{unicode} or L{NoneType}
    @ivar remoteRouteName: The name of the route which will be added to all
        boxes sent to this sender.  If C{None}, no route will be added.
    """
    implements(IBoxSender)

    def connectTo(self, remoteRouteName):
        """
        Set the name of the route which will be added to outgoing boxes.
        """
        self.remoteRouteName = remoteRouteName
        # This route must not be started before its router is started.  If
        # sender is None, then the router is not started.  When the router is
        # started, it will start this route.
        if self.router._sender is not None:
            self.start()


    def unbind(self):
        """
        Remove the association between this route and its router.
        """
        del self.router._routes[self.localRouteName]


    def start(self):
        """
        Associate this object with a receiver as its L{IBoxSender}.
        """
        self.receiver.startReceivingBoxes(self)


    def stop(self, reason):
        """
        Shut down the underlying receiver.
        """
        self.receiver.stopReceivingBoxes(reason)


    def sendBox(self, box):
        """
        Add the route and send the box.
        """
        if self.remoteRouteName is _unspecified:
            raise RouteNotConnected()
        if self.remoteRouteName is not None:
            box[_ROUTE] = self.remoteRouteName.encode('ascii')
        self.router._sender.sendBox(box)


    def unhandledError(self, failure):
        """
        Pass failures through to the wrapped L{IBoxSender} without
        modification.
        """
        self.router._sender.unhandledError(failure)



class Router:
    """
    An L{IBoxReceiver} implementation which demultiplexes boxes from an AMP
    connection being used with zero, one, or more routes.

    @ivar _sender: An L{IBoxSender} provider which is used to allow
        L{IBoxReceiver}s added to this router to send boxes.

    @ivar _unstarted: A C{dict} similar to C{_routes} set before
        C{startReceivingBoxes} is called and containing all routes which have
        been added but not yet started.  These are started and moved to the
        C{_routes} dict when the router is started.

    @ivar _routes: A C{dict} mapping local route identifiers to
        L{IBoxReceivers} associated with them.  This is only initialized after
        C{startReceivingBoxes} is called.

    @ivar _routeCounter: A L{itertools.count} instance used to generate unique
        identifiers for routes in this router.
    """
    implements(IBoxReceiver)

    _routes = None
    _sender = None

    def __init__(self):
        self._routeCounter = count()
        self._unstarted = {}


    def createRouteIdentifier(self):
        """
        Return a route identifier which is not yet associated with a route on
        this dispatcher.

        @rtype: C{unicode}
        """
        return unicode(self._routeCounter.next())


    def bindRoute(self, receiver, routeName=_unspecified):
        """
        Create a new route to associate the given route name with the given
        receiver.

        @type routeName: C{unicode} or L{NoneType}
        @param routeName: The identifier for the newly created route.  If
            C{None}, boxes with no route in them will be delivered to this
            receiver.

        @rtype: L{Route}
        """
        if routeName is _unspecified:
            routeName = self.createRouteIdentifier()
        # self._sender may yet be None; if so, this route goes into _unstarted
        # and will have its sender set correctly in startReceivingBoxes below.
        route = Route(self, receiver, routeName)
        mapping = self._routes
        if mapping is None:
            mapping = self._unstarted
        mapping[routeName] = route
        return route


    def startReceivingBoxes(self, sender):
        """
        Initialize route tracking objects.
        """
        self._sender = sender
        for routeName, route in self._unstarted.iteritems():
            # Any route which has been bound but which does not yet have a
            # remote route name should not yet be started.  These will be
            # started in Route.connectTo.
            if route.remoteRouteName is not _unspecified:
                route.start()
        self._routes = self._unstarted
        self._unstarted = None


    def ampBoxReceived(self, box):
        """
        Dispatch the given box to the L{IBoxReceiver} associated with the route
        indicated by the box, or handle it directly if there is no route.
        """
        route = box.pop(_ROUTE, None)
        self._routes[route].receiver.ampBoxReceived(box)


    def stopReceivingBoxes(self, reason):
        """
        Stop all the L{IBoxReceiver}s which have been added to this router.
        """
        for routeName, route in self._routes.iteritems():
            route.stop(reason)
        self._routes = None



__all__ = ['Router', 'Route']