1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
# -*- test-case-name: tubes.test.test_memory -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{tubes.memory}.
"""
from twisted.trial.unittest import SynchronousTestCase
from zope.interface.verify import verifyObject
from tubes.memory import iteratorFount
from tubes.itube import IFount, StopFlowCalled
from .util import FakeDrain, FakeFount
class DrainThatStops(FakeDrain):
"""
Drain that stops its fount upon receive.
"""
# TODO: possibly promote to util
def receive(self, item):
"""
Receive one item and stop.
@param item: any object.
"""
super(DrainThatStops, self).receive(item)
self.fount.stopFlow()
class DrainThatPauses(FakeDrain):
"""
Drain that pauses its fount upon receive.
"""
# TODO: possibly promote to util
def receive(self, item):
"""
Receive one item and pause.
@param item: any object.
"""
super(DrainThatPauses, self).receive(item)
self.pause = self.fount.pauseFlow()
class IteratorFountTests(SynchronousTestCase):
"""
Tests for L{tubes.memory.iteratorFount}.
"""
def test_flowTo(self):
"""
L{iteratorFount.flowTo} sets its drain and calls C{flowingFrom} on its
argument, returning that value.
"""
f = iteratorFount([])
ff = FakeFount()
class FakeDrainThatContinues(FakeDrain):
def flowingFrom(self, fount):
super(FakeDrainThatContinues, self).flowingFrom(fount)
return ff
fd = FakeDrainThatContinues()
result = f.flowTo(fd)
self.assertIdentical(fd.fount, f)
self.assertIdentical(f.drain, fd)
self.assertIdentical(result, ff)
def test_flowToDeliversValues(self):
"""
L{iteratorFount.flowTo} will deliver all of its values to the given
drain.
"""
f = iteratorFount([1, 2, 3])
fd = FakeDrain()
f.flowTo(fd)
self.assertEqual(fd.received, [1, 2, 3])
def test_pauseFlow(self):
"""
L{iteratorFount.pauseFlow} will pause the delivery of items.
"""
f = iteratorFount([1, 2, 3])
fd = DrainThatPauses()
f.flowTo(fd)
self.assertEqual(fd.received, [1])
def test_unpauseFlow(self):
"""
When all pauses returned by L{iteratorFount.pauseFlow} have been
unpaused, the flow resumes.
"""
f = iteratorFount([1, 2, 3])
fd = FakeDrain()
pauses = [f.pauseFlow(), f.pauseFlow()]
f.flowTo(fd)
self.assertEqual(fd.received, [])
pauses.pop().unpause()
self.assertEqual(fd.received, [])
pauses.pop().unpause()
self.assertEqual(fd.received, [1, 2, 3])
def test_stopFlow(self):
"""
L{iteratorFount.stopFlow} stops the flow, propagating a C{flowStopped}
call to its drain and ceasing delivery immediately.
"""
f = iteratorFount([1, 2, 3])
fd = DrainThatStops()
f.flowTo(fd)
self.assertEqual(fd.received, [1])
self.assertEqual(fd.stopped[0].type, StopFlowCalled)
def test_stopIterationStopsIteration(self):
"""
When the iterator passed to L{iteratorFount} is exhausted
L{IDrain.flowStopped} is called with L{StopIteration} as it's
reason.
"""
f = iteratorFount([1, 2, 3])
fd = FakeDrain()
f.flowTo(fd)
self.assertEqual(fd.received, [1, 2, 3])
self.assertEqual(fd.stopped[0].type, StopIteration)
def test_stopFlowCalledAfterFlowStopped(self):
"""
L{iteratorFount} will only call its C{drain}'s L{flowStopped} once when
C{stopFlow} is called after the flow has stopped due to iterator
exhaustion.
"""
f = iteratorFount([1])
fd = FakeDrain()
f.flowTo(fd)
self.assertEqual(fd.received, [1])
self.assertEqual(len(fd.stopped), 1)
f.stopFlow()
self.assertEqual(len(fd.stopped), 1)
self.assertEqual(fd.stopped[0].type, StopIteration)
def test_stopPausedFlow(self):
"""
When L{iteratorFount} is stopped after being paused, the drain will
receive a C{flowStopped} when it is resumed.
"""
f = iteratorFount([1, 2])
fd = DrainThatPauses()
f.flowTo(fd)
f.stopFlow()
self.assertEqual(fd.received, [1])
self.assertEqual(len(fd.stopped), 0)
fd.pause.unpause()
self.assertEqual(len(fd.stopped), 1)
self.assertEqual(fd.stopped[0].type, StopFlowCalled)
def test_flowUnpausedAfterPausedFlowIsStopped(self):
"""
When L{iteratorFount} is stopped after being paused, and subsequently
unpaused it should not start flowing again.
"""
f = iteratorFount([1, 2])
fd = DrainThatPauses()
f.flowTo(fd)
f.stopFlow()
fd.pause.unpause()
self.assertEqual(fd.received, [1])
def test_provides(self):
"""
An L{iteratorFount} provides L{IFount}.
"""
verifyObject(IFount, iteratorFount([]))
|