# -*- test-case-name: tubes.test.test_undefer -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{tubes.undefer}.
"""

from twisted.trial.unittest import SynchronousTestCase
from twisted.internet.defer import succeed
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure

from tubes.undefer import deferredToResult, fountToDeferred
from tubes.test.util import FakeDrain
from tubes.test.util import FakeFount
from tubes.tube import tube, series

class DeferredIntegrationTests(SynchronousTestCase):
    """
    Tests for L{deferredToResult}.
    """
    def setUp(self):
        """
        Create a fount and drain.
        """
        self.ff = FakeFount()
        self.fd = FakeDrain()


    def test_tubeYieldsFiredDeferred(self):
        """
        When a tube yields a fired L{Deferred} its result is synchronously
        delivered.
        """

        @tube
        class SucceedingTube(object):
            def received(self, data):
                yield succeed(''.join(reversed(data)))

        fakeDrain = self.fd
        self.ff.flowTo(series(SucceedingTube(),
                              deferredToResult())).flowTo(fakeDrain)
        self.ff.drain.receive("hello")
        self.assertEqual(self.fd.received, ["olleh"])


    def test_tubeYieldsUnfiredDeferred(self):
        """
        When a tube yields an unfired L{Deferred} its result is asynchronously
        delivered.
        """

        d = Deferred()

        @tube
        class WaitingTube(object):
            def received(self, data):
                yield d

        fakeDrain = self.fd
        self.ff.flowTo(series(WaitingTube(),
                              deferredToResult())).flowTo(fakeDrain)
        self.ff.drain.receive("ignored")
        self.assertEqual(self.fd.received, [])

        d.callback("hello")

        self.assertEqual(self.fd.received, ["hello"])


    def test_tubeYieldsMultipleDeferreds(self):
        """
        When a tube yields multiple deferreds their results should be delivered
        in order.
        """

        d = Deferred()

        @tube
        class MultiDeferredTube(object):
            didYield = False
            def received(self, data):
                yield d
                MultiDeferredTube.didYield = True
                yield succeed("goodbye")

        fakeDrain = self.fd
        self.ff.flowTo(series(MultiDeferredTube(),
                              deferredToResult())).flowTo(fakeDrain)
        self.ff.drain.receive("ignored")
        self.assertEqual(self.fd.received, [])

        d.callback("hello")

        self.assertEqual(self.fd.received, ["hello", "goodbye"])


    def test_tubeYieldedDeferredFiresWhileFlowIsPaused(self):
        """
        When a L{Tube} yields an L{Deferred} and that L{Deferred} fires when
        the L{_SiphonFount} is paused it should buffer it's result and deliver
        it when L{_SiphonFount.resumeFlow} is called.
        """
        d = Deferred()

        @tube
        class DeferredTube(object):
            def received(self, data):
                yield d

        fakeDrain = self.fd
        self.ff.flowTo(series(DeferredTube(),
                              deferredToResult())).flowTo(fakeDrain)
        self.ff.drain.receive("ignored")

        anPause = self.fd.fount.pauseFlow()

        d.callback("hello")
        self.assertEqual(self.fd.received, [])

        anPause.unpause()
        self.assertEqual(self.fd.received, ["hello"])


    def test_tubeStoppedDeferredly(self):
        """
        The L{_Siphon} stops its L{Tube} and propagates C{flowStopped}
        downstream upon the completion of all L{Deferred}s returned from its
        L{Tube}'s C{stopped} implementation.
        """
        reasons = []
        conclusion = Deferred()
        @tube
        class SlowEnder(object):
            def stopped(self, reason):
                reasons.append(reason)
                yield conclusion

        self.ff.flowTo(series(SlowEnder(), deferredToResult(), self.fd))
        self.assertEqual(reasons, [])
        self.assertEqual(self.fd.received, [])

        stopReason = Failure(ZeroDivisionError())

        self.ff.drain.flowStopped(stopReason)
        self.assertEqual(self.fd.received, [])
        self.assertEqual(len(reasons), 1)
        self.assertIdentical(reasons[0].type, ZeroDivisionError)
        self.assertEqual(self.fd.stopped, [])

        conclusion.callback("conclusion")
        # Now it's really done.
        self.assertEqual(self.fd.received, ["conclusion"])
        self.assertEqual(self.fd.stopped, [stopReason])


    def test_fountToDeferred(self):
        """
        L{fountToDeferred} returns a L{Deferred} that fires with an iterable of
        all the objects that the fount passed to it emits.
        """
        self.assertIsNone(self.ff.drain)
        d = fountToDeferred(self.ff)
        self.assertIsNotNone(self.ff.drain)
        self.assertNoResult(d)
        self.ff.drain.receive(1)
        self.assertNoResult(d)
        self.ff.drain.receive(2)
        self.ff.drain.flowStopped(Failure(ZeroDivisionError()))
        self.assertEqual(list(self.successResultOf(d)), [1, 2])
