File: _catch.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (78 lines) | stat: -rw-r--r-- 2,575 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from asyncio import Future
from typing import Callable, Optional, TypeVar, Union

import reactivex
from reactivex import Observable, abc
from reactivex.disposable import SerialDisposable, SingleAssignmentDisposable

_T = TypeVar("_T")


def catch_handler(
    source: Observable[_T],
    handler: Callable[[Exception, Observable[_T]], Union[Observable[_T], "Future[_T]"]],
) -> Observable[_T]:
    def subscribe(
        observer: abc.ObserverBase[_T], scheduler: Optional[abc.SchedulerBase] = None
    ) -> abc.DisposableBase:
        d1 = SingleAssignmentDisposable()
        subscription = SerialDisposable()

        subscription.disposable = d1

        def on_error(exception: Exception) -> None:
            try:
                result = handler(exception, source)
            except Exception as ex:  # By design. pylint: disable=W0703
                observer.on_error(ex)
                return

            result = (
                reactivex.from_future(result) if isinstance(result, Future) else result
            )
            d = SingleAssignmentDisposable()
            subscription.disposable = d
            d.disposable = result.subscribe(observer, scheduler=scheduler)

        d1.disposable = source.subscribe(
            observer.on_next, on_error, observer.on_completed, scheduler=scheduler
        )
        return subscription

    return Observable(subscribe)


def catch_(
    handler: Union[
        Observable[_T], Callable[[Exception, Observable[_T]], Observable[_T]]
    ]
) -> Callable[[Observable[_T]], Observable[_T]]:
    def catch(source: Observable[_T]) -> Observable[_T]:
        """Continues an observable sequence that is terminated by an
        exception with the next observable sequence.

        Examples:
            >>> op = catch(ys)
            >>> op = catch(lambda ex, src: ys(ex))

        Args:
            handler: Second observable sequence used to produce
                results when an error occurred in the first sequence, or an
                exception handler function that returns an observable sequence
                given the error and source observable that occurred in the
                first sequence.

        Returns:
            An observable sequence containing the first sequence's
            elements, followed by the elements of the handler sequence
            in case an exception occurred.
        """
        if callable(handler):
            return catch_handler(source, handler)
        else:
            return reactivex.catch(source, handler)

    return catch


__all__ = ["catch_"]