File: fromcallback.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (59 lines) | stat: -rw-r--r-- 1,878 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from typing import Any, Callable, Optional

from reactivex import Observable, abc, typing
from reactivex.disposable import Disposable


def from_callback_(
    func: Callable[..., Callable[..., None]],
    mapper: Optional[typing.Mapper[Any, Any]] = None,
) -> Callable[[], Observable[Any]]:
    """Converts a callback function to an observable sequence.

    Args:
        func: Function with a callback as the last argument to
            convert to an Observable sequence.
        mapper: [Optional] A mapper which takes the arguments
            from the callback to produce a single item to yield on next.

    Returns:
        A function, when executed with the required arguments minus
        the callback, produces an Observable sequence with a single value of
        the arguments to the callback as a list.
    """

    def function(*args: Any) -> Observable[Any]:
        arguments = list(args)

        def subscribe(
            observer: abc.ObserverBase[Any],
            scheduler: Optional[abc.SchedulerBase] = None,
        ) -> abc.DisposableBase:
            def handler(*args: Any) -> None:
                results = list(args)
                if mapper:
                    try:
                        results = mapper(args)
                    except Exception as err:  # pylint: disable=broad-except
                        observer.on_error(err)
                        return

                    observer.on_next(results)
                else:
                    if len(results) <= 1:
                        observer.on_next(*results)
                    else:
                        observer.on_next(results)

                    observer.on_completed()

            arguments.append(handler)
            func(*arguments)
            return Disposable()

        return Observable(subscribe)

    return function


__all__ = ["from_callback_"]