File: test_undefer.py

package info (click to toggle)
python-tubes 0.2.1-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 740 kB
  • sloc: python: 3,215; makefile: 149
file content (172 lines) | stat: -rw-r--r-- 5,217 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# -*- test-case-name: tubes.test.test_undefer -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{tubes.undefer}.
"""

from twisted.trial.unittest import SynchronousTestCase
from twisted.internet.defer import succeed
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure

from tubes.undefer import deferredToResult, fountToDeferred
from tubes.test.util import FakeDrain
from tubes.test.util import FakeFount
from tubes.tube import tube, series

class DeferredIntegrationTests(SynchronousTestCase):
    """
    Tests for L{deferredToResult}.
    """
    def setUp(self):
        """
        Create a fount and drain.
        """
        self.ff = FakeFount()
        self.fd = FakeDrain()


    def test_tubeYieldsFiredDeferred(self):
        """
        When a tube yields a fired L{Deferred} its result is synchronously
        delivered.
        """

        @tube
        class SucceedingTube(object):
            def received(self, data):
                yield succeed(''.join(reversed(data)))

        fakeDrain = self.fd
        self.ff.flowTo(series(SucceedingTube(),
                              deferredToResult())).flowTo(fakeDrain)
        self.ff.drain.receive("hello")
        self.assertEqual(self.fd.received, ["olleh"])


    def test_tubeYieldsUnfiredDeferred(self):
        """
        When a tube yields an unfired L{Deferred} its result is asynchronously
        delivered.
        """

        d = Deferred()

        @tube
        class WaitingTube(object):
            def received(self, data):
                yield d

        fakeDrain = self.fd
        self.ff.flowTo(series(WaitingTube(),
                              deferredToResult())).flowTo(fakeDrain)
        self.ff.drain.receive("ignored")
        self.assertEqual(self.fd.received, [])

        d.callback("hello")

        self.assertEqual(self.fd.received, ["hello"])


    def test_tubeYieldsMultipleDeferreds(self):
        """
        When a tube yields multiple deferreds their results should be delivered
        in order.
        """

        d = Deferred()

        @tube
        class MultiDeferredTube(object):
            didYield = False
            def received(self, data):
                yield d
                MultiDeferredTube.didYield = True
                yield succeed("goodbye")

        fakeDrain = self.fd
        self.ff.flowTo(series(MultiDeferredTube(),
                              deferredToResult())).flowTo(fakeDrain)
        self.ff.drain.receive("ignored")
        self.assertEqual(self.fd.received, [])

        d.callback("hello")

        self.assertEqual(self.fd.received, ["hello", "goodbye"])


    def test_tubeYieldedDeferredFiresWhileFlowIsPaused(self):
        """
        When a L{Tube} yields an L{Deferred} and that L{Deferred} fires when
        the L{_SiphonFount} is paused it should buffer it's result and deliver
        it when L{_SiphonFount.resumeFlow} is called.
        """
        d = Deferred()

        @tube
        class DeferredTube(object):
            def received(self, data):
                yield d

        fakeDrain = self.fd
        self.ff.flowTo(series(DeferredTube(),
                              deferredToResult())).flowTo(fakeDrain)
        self.ff.drain.receive("ignored")

        anPause = self.fd.fount.pauseFlow()

        d.callback("hello")
        self.assertEqual(self.fd.received, [])

        anPause.unpause()
        self.assertEqual(self.fd.received, ["hello"])


    def test_tubeStoppedDeferredly(self):
        """
        The L{_Siphon} stops its L{Tube} and propagates C{flowStopped}
        downstream upon the completion of all L{Deferred}s returned from its
        L{Tube}'s C{stopped} implementation.
        """
        reasons = []
        conclusion = Deferred()
        @tube
        class SlowEnder(object):
            def stopped(self, reason):
                reasons.append(reason)
                yield conclusion

        self.ff.flowTo(series(SlowEnder(), deferredToResult(), self.fd))
        self.assertEqual(reasons, [])
        self.assertEqual(self.fd.received, [])

        stopReason = Failure(ZeroDivisionError())

        self.ff.drain.flowStopped(stopReason)
        self.assertEqual(self.fd.received, [])
        self.assertEqual(len(reasons), 1)
        self.assertIdentical(reasons[0].type, ZeroDivisionError)
        self.assertEqual(self.fd.stopped, [])

        conclusion.callback("conclusion")
        # Now it's really done.
        self.assertEqual(self.fd.received, ["conclusion"])
        self.assertEqual(self.fd.stopped, [stopReason])


    def test_fountToDeferred(self):
        """
        L{fountToDeferred} returns a L{Deferred} that fires with an iterable of
        all the objects that the fount passed to it emits.
        """
        self.assertIsNone(self.ff.drain)
        d = fountToDeferred(self.ff)
        self.assertIsNotNone(self.ff.drain)
        self.assertNoResult(d)
        self.ff.drain.receive(1)
        self.assertNoResult(d)
        self.ff.drain.receive(2)
        self.ff.drain.flowStopped(Failure(ZeroDivisionError()))
        self.assertEqual(list(self.successResultOf(d)), [1, 2])