File: test_fromfuture.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (105 lines) | stat: -rw-r--r-- 2,590 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import asyncio
import unittest
from asyncio import Future

import reactivex


class TestFromFuture(unittest.TestCase):
    def test_future_success(self):
        loop = asyncio.get_event_loop()
        success = [False, True, False]

        async def go():
            future = Future()
            future.set_result(42)

            source = reactivex.from_future(future)

            def on_next(x):
                success[0] = x == 42

            def on_error(err):
                success[1] = False

            def on_completed():
                success[2] = True

            source.subscribe(on_next, on_error, on_completed)

        loop.run_until_complete(go())
        assert all(success)

    def test_future_failure(self):
        loop = asyncio.get_event_loop()
        success = [True, False, True]

        async def go():
            error = Exception("woops")

            future = Future()
            future.set_exception(error)

            source = reactivex.from_future(future)

            def on_next(x):
                success[0] = False

            def on_error(err):
                success[1] = str(err) == str(error)

            def on_completed():
                success[2] = False

            source.subscribe(on_next, on_error, on_completed)

        loop.run_until_complete(go())
        assert all(success)

    def test_future_cancel(self):
        loop = asyncio.get_event_loop()
        success = [True, False, True]

        async def go():
            future = Future()
            source = reactivex.from_future(future)

            def on_next(x):
                success[0] = False

            def on_error(err):
                success[1] = type(err) == asyncio.CancelledError

            def on_completed():
                success[2] = False

            source.subscribe(on_next, on_error, on_completed)
            future.cancel()

        loop.run_until_complete(go())
        assert all(success)

    def test_future_dispose(self):
        loop = asyncio.get_event_loop()
        success = [True, True, True]

        async def go():
            future = Future()
            future.set_result(42)

            source = reactivex.from_future(future)

            def on_next(x):
                success[0] = False

            def on_error(err):
                success[1] = False

            def on_completed():
                success[2] = False

            subscription = source.subscribe(on_next, on_error, on_completed)
            subscription.dispose()

        loop.run_until_complete(go())
        assert all(success)