1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
|
# -*- test-case-name: tubes.test.test_routing -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
A L{Router} receives items with addressing information and dispatches them to
an appropriate output, stripping the addressing information off.
Use like so::
from tubes.tube import receiver, series
from tubes.routing import Router, to
aRouter = Router()
evens = aRouter.newRoute()
odds = aRouter.newRoute()
@receiver()
def evenOdd(item):
if (item % 2) == 0:
yield to(evens, item)
else:
yield to(odds, item)
numbers.flowTo(series(evenOdd, aRouter.drain))
Assuming C{numbers} is a fount of counting integers, this creates two founts:
C{evens} and C{odds}, whose outputs are even and odd integers, respectively.
Note that C{evenOdd} also uses C{evens} and C{odds} as I{addresses}; the first
argument to L{to} says I{where} the value will go.
Why do this rather than just having C{evenOdd} just call methods directly based
on whether a number is even or odd?
By using a L{Router}, flow control relationships are automatically preserved by
the same mechanism that tubes usually use. The distinct drains of C{evens} and
C{odds} can both independently pause their founts, and the pause state will be
propagated to the "numbers" fount. If you want to send on outputs to multiple
drains which may have complex flow-control interrelationships, you can't do
that by calling the C{receive} method directly since any one of those methods
might reentrantly pause its fount.
"""
from zope.interface import implementer
from .tube import receiver, series
from .itube import IDrain
from .fan import Out
from .kit import beginFlowingFrom
if 0:
from zope.interface.interfaces import ISpecification
ISpecification
__all__ = [
"Router",
"Routed",
"to",
]
class Routed(object):
"""
A L{Routed} is a specification describing another specification that has
been wrapped in a C{to}. As such, it is an incomplete implementation of
L{ISpecification}.
"""
def __init__(self, specification=None):
"""
Derive a L{Routed} version of C{specification}.
@param specification: the specification that will be provided by the
C{what} attribute of providers of this specification.
@type specification: L{ISpecification}
"""
self.specification = specification
def isOrExtends(self, other):
"""
Is this L{Routed} substitutable for the given specification?
@param other: Another L{Routed} or specification.
@type other: L{ISpecification}
@return: L{True} if so, L{False} if not.
"""
if not isinstance(other, Routed):
return False
if self.specification is None or other.specification is None:
return True
return self.specification.isOrExtends(other.specification)
def providedBy(self, instance):
"""
Is this L{Routed} provided by a particular value?
@param instance: an object which may or may not provide this
specification.
@type instance: L{object}
@return: L{True} if so, L{False} if not.
@rtype: L{bool}
"""
if not isinstance(instance, _To):
return False
if self.specification is None:
return True
return self.specification.providedBy(instance._what)
def __eq__(self, other):
"""
Routed(X) compares equal to Routed(X).
"""
if not isinstance(other, Routed):
return NotImplemented
return self.specification == other.specification
def __ne__(self, other):
"""
Routed(X) compares unequal to Routed(Y).
"""
if not isinstance(other, Routed):
return NotImplemented
return self.specification != other.specification
class _To(object):
"""
An object destined for a specific destination.
"""
def __init__(self, where, what):
"""
Create a L{_To} to a particular route with a given value.
@param _where: see L{to}
@param _what: see L{to}
"""
self._where = where
self._what = what
def __repr__(self):
"""
@return: an explanatory string.
"""
return "to({!r}, {!r})".format(self._where, self._what)
def to(where, what):
"""
Construct a provider of L{Routed}C{(providedBy(where))}.
@see: L{tubes.routing}
@param where: A fount returned from L{Router.newRoute}. This must be
I{exactly} the return value of L{Router.newRoute}, as it is compared by
object identity and not by any feature of L{IFount}.
@param what: the value to deliver.
@return: a L{Routed} object.
"""
return _To(where, what)
class Router(object):
"""
A drain with multiple founts that consumes L{Routed}C{(IX)} from its input
and produces C{IX} to its outputs.
@ivar _out: A fan-out that consumes L{Routed}C{(X)} and produces C{X}.
@type _out: L{Out}
@ivar drain: The input to this L{Router}.
@type drain: L{IDrain}
"""
def __init__(self, outputType=None):
self._out = Out()
self._outputType = outputType
@implementer(IDrain)
class NullDrain(object):
inputType = outputType
fount = None
def flowingFrom(self, fount):
beginFlowingFrom(self, fount)
def receive(self, item):
pass
def flowStopped(self, reason):
pass
self.newRoute("NULL").flowTo(NullDrain())
self.drain = self._out.drain
def newRoute(self, name=None):
"""
Create a new route.
A route has two uses; first, it is an L{IFount} that you can flow to a
drain.
Second, it is the "where" parameter passed to L{to}. Each value sent
to L{Router.drain} should be a L{to} constructed with a value returned
from this method as the "where" parameter.
@param name: Give the route a name for debugging purposes.
@type name: native L{str}
@return: L{IFount}
"""
@receiver(inputType=Routed(self._outputType),
outputType=self._outputType,
name=name)
def received(item):
if not isinstance(item, _To):
raise TypeError("{0} is not routed".format(item))
if item._where is fount:
yield item._what
fount = self._out.newFount().flowTo(series(received))
return fount
|