File: test_memory.py

package info (click to toggle)
python-tubes 0.2.1-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 740 kB
  • sloc: python: 3,215; makefile: 149
file content (187 lines) | stat: -rw-r--r-- 5,241 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# -*- test-case-name: tubes.test.test_memory -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{tubes.memory}.
"""

from twisted.trial.unittest import SynchronousTestCase

from zope.interface.verify import verifyObject

from tubes.memory import iteratorFount

from tubes.itube import IFount, StopFlowCalled

from .util import FakeDrain, FakeFount

class DrainThatStops(FakeDrain):
    """
    Drain that stops its fount upon receive.
    """
    # TODO: possibly promote to util
    def receive(self, item):
        """
        Receive one item and stop.

        @param item: any object.
        """
        super(DrainThatStops, self).receive(item)
        self.fount.stopFlow()



class DrainThatPauses(FakeDrain):
    """
    Drain that pauses its fount upon receive.
    """
    # TODO: possibly promote to util
    def receive(self, item):
        """
        Receive one item and pause.

        @param item: any object.
        """
        super(DrainThatPauses, self).receive(item)
        self.pause = self.fount.pauseFlow()



class IteratorFountTests(SynchronousTestCase):
    """
    Tests for L{tubes.memory.iteratorFount}.
    """

    def test_flowTo(self):
        """
        L{iteratorFount.flowTo} sets its drain and calls C{flowingFrom} on its
        argument, returning that value.
        """
        f = iteratorFount([])
        ff = FakeFount()
        class FakeDrainThatContinues(FakeDrain):
            def flowingFrom(self, fount):
                super(FakeDrainThatContinues, self).flowingFrom(fount)
                return ff
        fd = FakeDrainThatContinues()
        result = f.flowTo(fd)

        self.assertIdentical(fd.fount, f)
        self.assertIdentical(f.drain, fd)
        self.assertIdentical(result, ff)


    def test_flowToDeliversValues(self):
        """
        L{iteratorFount.flowTo} will deliver all of its values to the given
        drain.
        """
        f = iteratorFount([1, 2, 3])
        fd = FakeDrain()
        f.flowTo(fd)
        self.assertEqual(fd.received, [1, 2, 3])


    def test_pauseFlow(self):
        """
        L{iteratorFount.pauseFlow} will pause the delivery of items.
        """
        f = iteratorFount([1, 2, 3])
        fd = DrainThatPauses()
        f.flowTo(fd)
        self.assertEqual(fd.received, [1])


    def test_unpauseFlow(self):
        """
        When all pauses returned by L{iteratorFount.pauseFlow} have been
        unpaused, the flow resumes.
        """
        f = iteratorFount([1, 2, 3])
        fd = FakeDrain()
        pauses = [f.pauseFlow(), f.pauseFlow()]
        f.flowTo(fd)
        self.assertEqual(fd.received, [])
        pauses.pop().unpause()
        self.assertEqual(fd.received, [])
        pauses.pop().unpause()
        self.assertEqual(fd.received, [1, 2, 3])


    def test_stopFlow(self):
        """
        L{iteratorFount.stopFlow} stops the flow, propagating a C{flowStopped}
        call to its drain and ceasing delivery immediately.
        """
        f = iteratorFount([1, 2, 3])

        fd = DrainThatStops()
        f.flowTo(fd)
        self.assertEqual(fd.received, [1])
        self.assertEqual(fd.stopped[0].type, StopFlowCalled)


    def test_stopIterationStopsIteration(self):
        """
        When the iterator passed to L{iteratorFount} is exhausted
        L{IDrain.flowStopped} is called with L{StopIteration} as it's
        reason.
        """
        f = iteratorFount([1, 2, 3])
        fd = FakeDrain()
        f.flowTo(fd)
        self.assertEqual(fd.received, [1, 2, 3])
        self.assertEqual(fd.stopped[0].type, StopIteration)


    def test_stopFlowCalledAfterFlowStopped(self):
        """
        L{iteratorFount} will only call its C{drain}'s L{flowStopped} once when
        C{stopFlow} is called after the flow has stopped due to iterator
        exhaustion.
        """
        f = iteratorFount([1])
        fd = FakeDrain()
        f.flowTo(fd)
        self.assertEqual(fd.received, [1])
        self.assertEqual(len(fd.stopped), 1)
        f.stopFlow()
        self.assertEqual(len(fd.stopped), 1)
        self.assertEqual(fd.stopped[0].type, StopIteration)


    def test_stopPausedFlow(self):
        """
        When L{iteratorFount} is stopped after being paused, the drain will
        receive a C{flowStopped} when it is resumed.
        """
        f = iteratorFount([1, 2])
        fd = DrainThatPauses()
        f.flowTo(fd)
        f.stopFlow()
        self.assertEqual(fd.received, [1])
        self.assertEqual(len(fd.stopped), 0)
        fd.pause.unpause()
        self.assertEqual(len(fd.stopped), 1)
        self.assertEqual(fd.stopped[0].type, StopFlowCalled)


    def test_flowUnpausedAfterPausedFlowIsStopped(self):
        """
        When L{iteratorFount} is stopped after being paused, and subsequently
        unpaused it should not start flowing again.
        """
        f = iteratorFount([1, 2])
        fd = DrainThatPauses()
        f.flowTo(fd)
        f.stopFlow()
        fd.pause.unpause()
        self.assertEqual(fd.received, [1])


    def test_provides(self):
        """
        An L{iteratorFount} provides L{IFount}.
        """
        verifyObject(IFount, iteratorFount([]))