File: test_returnvalue.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (83 lines) | stat: -rw-r--r-- 2,315 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import unittest

import reactivex
from reactivex.disposable import SerialDisposable
from reactivex.testing import ReactiveTest, TestScheduler

on_next = ReactiveTest.on_next
on_completed = ReactiveTest.on_completed
on_error = ReactiveTest.on_error
subscribe = ReactiveTest.subscribe
subscribed = ReactiveTest.subscribed
disposed = ReactiveTest.disposed
created = ReactiveTest.created


class RxException(Exception):
    pass


# Helper function for raising exceptions within lambdas
def _raise(ex):
    raise RxException(ex)


class TestReturnValue(unittest.TestCase):
    def test_return_basic(self):
        scheduler = TestScheduler()

        def factory():
            return reactivex.return_value(42)

        results = scheduler.start(factory)
        assert results.messages == [on_next(200, 42), on_completed(200)]

    def test_return_disposed(self):
        scheduler = TestScheduler()

        def factory():
            return reactivex.return_value(42)

        results = scheduler.start(factory, disposed=200)
        assert results.messages == []

    def test_return_disposed_after_next(self):
        scheduler = TestScheduler()
        d = SerialDisposable()
        xs = reactivex.return_value(42)
        results = scheduler.create_observer()

        def action(scheduler, state):
            def on_next(x):
                d.dispose()
                results.on_next(x)

            def on_error(e):
                results.on_error(e)

            def on_completed():
                results.on_completed()

            d.disposable = xs.subscribe(
                on_next, on_error, on_completed, scheduler=scheduler
            )
            return d.disposable

        scheduler.schedule_absolute(100, action)
        scheduler.start()
        assert results.messages == [on_next(100, 42)]

    def test_return_observer_throws(self):
        scheduler1 = TestScheduler()
        xs = reactivex.return_value(1)
        xs.subscribe(lambda x: _raise("ex"), scheduler=scheduler1)

        self.assertRaises(RxException, scheduler1.start)

        scheduler2 = TestScheduler()
        ys = reactivex.return_value(1)
        ys.subscribe(
            lambda x: x, lambda ex: ex, lambda: _raise("ex"), scheduler=scheduler2
        )

        self.assertRaises(RxException, scheduler2.start)