File: _timeoutwithmapper.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (134 lines) | stat: -rw-r--r-- 4,408 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from typing import Any, Callable, Optional, TypeVar

import reactivex
from reactivex import Observable, abc
from reactivex.disposable import (
    CompositeDisposable,
    SerialDisposable,
    SingleAssignmentDisposable,
)

_T = TypeVar("_T")


def timeout_with_mapper_(
    first_timeout: Optional[Observable[_T]] = None,
    timeout_duration_mapper: Optional[Callable[[Any], Observable[Any]]] = None,
    other: Optional[Observable[_T]] = None,
) -> Callable[[Observable[_T]], Observable[_T]]:
    """Returns the source observable sequence, switching to the other
    observable sequence if a timeout is signaled.

        res = timeout_with_mapper(reactivex.timer(500))
        res = timeout_with_mapper(reactivex.timer(500), lambda x: reactivex.timer(200))
        res = timeout_with_mapper(
            reactivex.timer(500),
            lambda x: reactivex.timer(200)),
            reactivex.return_value(42)
        )

    Args:
        first_timeout -- [Optional] Observable sequence that represents the
            timeout for the first element. If not provided, this defaults to
            reactivex.never().
        timeout_duration_mapper -- [Optional] Selector to retrieve an
            observable sequence that represents the timeout between the
            current element and the next element.
        other -- [Optional] Sequence to return in case of a timeout. If not
            provided, this is set to reactivex.throw().

    Returns:
        The source sequence switching to the other sequence in case
    of a timeout.
    """

    first_timeout_ = first_timeout or reactivex.never()
    other_ = other or reactivex.throw(Exception("Timeout"))

    def timeout_with_mapper(source: Observable[_T]) -> Observable[_T]:
        def subscribe(
            observer: abc.ObserverBase[_T],
            scheduler: Optional[abc.SchedulerBase] = None,
        ) -> abc.DisposableBase:
            subscription = SerialDisposable()
            timer = SerialDisposable()
            original = SingleAssignmentDisposable()

            subscription.disposable = original

            switched = False
            _id = [0]

            def set_timer(timeout: Observable[Any]) -> None:
                my_id = _id[0]

                def timer_wins():
                    return _id[0] == my_id

                d = SingleAssignmentDisposable()
                timer.disposable = d

                def on_next(x: Any) -> None:
                    if timer_wins():
                        subscription.disposable = other_.subscribe(
                            observer, scheduler=scheduler
                        )

                    d.dispose()

                def on_error(e: Exception) -> None:
                    if timer_wins():
                        observer.on_error(e)

                def on_completed() -> None:
                    if timer_wins():
                        subscription.disposable = other_.subscribe(observer)

                d.disposable = timeout.subscribe(
                    on_next, on_error, on_completed, scheduler=scheduler
                )

            set_timer(first_timeout_)

            def observer_wins():
                res = not switched
                if res:
                    _id[0] += 1

                return res

            def on_next(x: _T) -> None:
                if observer_wins():
                    observer.on_next(x)
                    timeout = None
                    try:
                        timeout = (
                            timeout_duration_mapper(x)
                            if timeout_duration_mapper
                            else reactivex.never()
                        )
                    except Exception as e:
                        observer.on_error(e)
                        return

                    set_timer(timeout)

            def on_error(error: Exception) -> None:
                if observer_wins():
                    observer.on_error(error)

            def on_completed() -> None:
                if observer_wins():
                    observer.on_completed()

            original.disposable = source.subscribe(
                on_next, on_error, on_completed, scheduler=scheduler
            )
            return CompositeDisposable(subscription, timer)

        return Observable(subscribe)

    return timeout_with_mapper


__all__ = ["timeout_with_mapper_"]