File: util.py

package info (click to toggle)
python-tubes 0.2.1-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 740 kB
  • sloc: python: 3,215; makefile: 149
file content (448 lines) | stat: -rw-r--r-- 10,628 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
# -*- test-case-name: tubes.test -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Utilities for testing L{tubes}.
"""

from zope.interface import Interface, implementer
from zope.interface.verify import verifyClass

from twisted.test.proto_helpers import StringTransport
from twisted.internet.defer import succeed
from twisted.internet.interfaces import (
    IStreamClientEndpoint, IStreamServerEndpoint, IListeningPort, IPushProducer
)

from tubes.itube import IDrain, IFount, IDivertable
from tubes.tube import tube
from tubes.kit import Pauser, beginFlowingFrom, beginFlowingTo

from twisted.internet.defer import Deferred

@implementer(IStreamClientEndpoint)
class StringEndpoint(object):
    """
    A client endpoint which connects to a L{StringTransport}.
    """
    def __init__(self):
        """
        Initialize the list of connected transports.
        """
        self.transports = []


    def connect(self, factory):
        """
        Connect the given L{IProtocolFactory} to a L{StringTransport} and
        return a fired L{Deferred}.

        @param factory: see L{IStreamClientEndpoint}

        @return: see L{IStreamClientEndpoint}
        """
        protocol = factory.buildProtocol(None)
        transport = StringTransport()
        transport.protocol = protocol
        protocol.makeConnection(transport)
        self.transports.append(transport)
        return succeed(protocol)



class IFakeOutput(Interface):
    """
    A sample interface to be used as an output marker for a fount.
    """



class IFakeInput(Interface):
    """
    A sample interface to be used as an input marker for a drain.
    """



@implementer(IFakeInput)
class FakeInput(object):
    """
    An implementation of a sample interface.
    """



@implementer(IDrain)
class FakeDrain(object):
    """
    Implements a fake IDrain for testing.

    @ivar received: All items that have thus far been received.
    @type received: L{list}

    @ivar stopped: All reasons that C{flowStopped} has been called with.
    @type stopped: L{list}
    """

    fount = None

    def __init__(self, inputType=None):
        self.received = []
        self.stopped = []
        self.inputType = inputType


    def flowingFrom(self, fount):
        """
        Set the C{fount} attribute.

        @param fount: see L{IDrain}
        """
        beginFlowingFrom(self, fount)


    def receive(self, item):
        """
        Append an item to L{FakeDrain.received}.

        @param item: see L{IDrain}
        """
        if self.fount is None:
            raise RuntimeError(
                "Invalid state: can't call receive on a drain "
                "when it's got no fount.")
        self.received.append(item)


    def flowStopped(self, reason):
        """
        The flow was stopped, record C{reason} in L{FakeDrain.stopped}.

        @param reason: see L{IDrain}
        """
        self.stopped.append(reason)


verifyClass(IDrain, FakeDrain)



@implementer(IFount)
class FakeFount(object):
    """
    Fake fount implementation for testing.
    """
    drain = None

    flowIsPaused = 0
    flowIsStopped = False

    def __init__(self, outputType=None):
        self._pauser = Pauser(self._actuallyPause, self._actuallyResume)
        self.outputType = outputType


    def flowTo(self, drain):
        """
        Record C{self.drain} and return its L{IDrain.flowingFrom} result.

        @param drain: see L{IFount}

        @return: see L{IFount}
        """
        # Either fount or drain may break the cycle, but it must inform its
        # peer by calling flowingFrom() or flowTo() with None so that they can
        # give up any resources associated with its peer, most especially the
        # drain letting go of pauses.
        return beginFlowingTo(self, drain)


    def pauseFlow(self):
        """
        Record C{self.drain} and return its L{IDrain.flowingFrom} result.

        @param drain: see L{IFount}

        @return: see L{IFount}
        """
        return self._pauser.pause()


    def stopFlow(self):
        """
        Record that the flow was stopped by incrementing C{flowIsStopped}.
        """
        self.flowIsStopped += 1


    def _actuallyPause(self):
        """
        Pause the flow (incrementing flowIsPaused).

        @note: this is overridden in subclasses to modify behavior.
        """
        self.flowIsPaused += 1


    def _actuallyResume(self):
        """
        Resume the flow (decrementing flowIsPaused).

        @note: this is overridden in subclasses to modify behavior.
        """
        self.flowIsPaused -= 1


verifyClass(IFount, FakeFount)



@tube
class TesterTube(object):
    """
    Tube for testing that records its inputs.
    """

    def __init__(self):
        """
        Initialize structures for recording.
        """
        self.allReceivedItems = []


    def received(self, item):
        """
        Recieved an item, remember it.

        @param item: see L{ITube}
        """
        self.allReceivedItems.append(item)



@implementer(IDivertable)
class JustProvidesSwitchable(TesterTube):
    """
    A L{TesterTube} that just provides L{IDivertable} for tests that want
    to assert about interfaces (no implementation actually provided).
    """



@tube
@implementer(IDivertable)
class ReprTube(object):
    """
    A L{tubes.tube.tube} with a deterministic C{repr} for testing.
    """
    def __repr__(self):
        return '<Tube for Testing>'



@implementer(IDivertable)
@tube
class PassthruTube(object):
    """
    A L{tubes.tube.tube} which yields all of its input.
    """
    def received(self, data):
        """
        Produce all inputs as outputs.

        @param data: see L{IDivertable}
        """
        yield data


    def reassemble(self, data):
        """
        Reassemble any buffered outputs as inputs by simply returning them;
        valid since this tube takes the same input and output.

        @param data: see L{IDivertable}

        @return: C{data}
        """
        return data



class FakeFountWithBuffer(FakeFount):
    """
    Probably this should be replaced with a C{MemoryFount}.
    """
    def __init__(self):
        super(FakeFountWithBuffer, self).__init__()
        self.buffer = []


    def bufferUp(self, item):
        """
        Buffer items for delivery on the next resume or flowTo.

        @param item: see L{IFount}
        """
        self.buffer.append(item)


    def flowTo(self, drain):
        """
        Flush buffered items to the given drain as long as we're not paused.

        @param drain: The drain to flush to.

        @return: The result of flowing to the given drain.
        """
        result = super(FakeFountWithBuffer, self).flowTo(drain)
        self._go()
        return result


    def _actuallyResume(self):
        """
        Resume and unbuffer any items as long as we're not paused.
        """
        super(FakeFountWithBuffer, self)._actuallyResume()
        self._go()


    def _go(self):
        """
        Unbuffer any items as long as we're not paused.
        """
        while not self.flowIsPaused and self.buffer:
            item = self.buffer.pop(0)
            self.drain.receive(item)



@tube
class NullTube(object):
    """
    An L{ITube} that does nothing when inputs are received.
    """



@implementer(IListeningPort, IPushProducer)
class FakeListeningProducerPort(object):
    """
    This is a fake L{IListeningPort}, also implementing L{IPushProducer}, which
    L{flowFountFromEndpoint} needs to make backpressure work.
    """
    def __init__(self, factory):
        """
        Create a L{FakeListeningProducerPort} with the given protocol
        factory.
        """
        self.factory = factory
        self.stopper = Deferred()
        self.listenStopping = False
        self.currentlyProducing = True


    def pauseProducing(self):
        """
        Pause producing new connections.
        """
        self.currentlyProducing = False


    def resumeProducing(self):
        """
        Resume producing new connections.
        """
        self.currentlyProducing = True


    def startListening(self):
        """
        Start listening on this port.

        @raise CannotListenError: If it cannot listen on this port (e.g., it is
            a TCP port and it cannot bind to the required port number).
        """


    def stopListening(self):
        """
        Stop listening on this fake port.

        @return: a L{Deferred} that should be fired when the test wants to
            complete stopping listening.
        """
        self.listenStopping = True
        return self.stopper


    def stopProducing(self):
        """
        Stop producing more data.
        """
        self.stopListening()


    def getHost(self):
        """
        Get the host that this port is listening for.

        @return: An L{IAddress} provider.
        """

verifyClass(IListeningPort, FakeListeningProducerPort)
verifyClass(IPushProducer, FakeListeningProducerPort)


@implementer(IStreamServerEndpoint)
class FakeEndpoint(object):
    """
    A fake implementation of L{IStreamServerEndpoint} with a L{Deferred} that
    fires controllably.

    @ivar _listening: deferreds that will fire with listening ports when their
        C{.callback} is invoked (input to C{.callback} ignored); added to when
        C{listen} is called.
    @type _listening: L{list} of L{Deferred}

    @ivar _ports: list of ports that have already started listening
    @type _ports: L{list} of L{IListeningPort}
    """
    def __init__(self):
        """
        Create a L{FakeEndpoint}.
        """
        self._listening = []
        self._ports = []


    def listen(self, factory):
        """
        Listen with the given factory.

        @param factory: The factory to use for future connections.

        @return: a L{Deferred} that fires with a new listening port.
        """
        self._listening.append(Deferred())
        def newListener(ignored):
            result = FakeListeningProducerPort(factory)
            self._ports.append(result)
            return result
        return self._listening[-1].addCallback(newListener)



def fakeEndpointWithPorts():
    """
    Create a L{FakeEndpoint} and expose the list of ports that it uses.

    @return: a fake endpoint and a list of the ports it has listened on
    @rtype: a 2-tuple of C{(endpoint, ports)}, where C{ports} is a L{list} of
        L{IListeningPort}.
    """
    self = FakeEndpoint()
    return self, self._ports

verifyClass(IStreamServerEndpoint, FakeEndpoint)