File: concat.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (62 lines) | stat: -rw-r--r-- 1,884 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from typing import Any, Iterable, Optional, TypeVar

from reactivex import Observable, abc
from reactivex.disposable import (
    CompositeDisposable,
    Disposable,
    SerialDisposable,
    SingleAssignmentDisposable,
)
from reactivex.scheduler import CurrentThreadScheduler

_T = TypeVar("_T")


def concat_with_iterable_(sources: Iterable[Observable[_T]]) -> Observable[_T]:
    def subscribe(
        observer: abc.ObserverBase[_T], scheduler_: Optional[abc.SchedulerBase] = None
    ) -> abc.DisposableBase:
        _scheduler = scheduler_ or CurrentThreadScheduler.singleton()

        sources_ = iter(sources)

        subscription = SerialDisposable()
        cancelable = SerialDisposable()
        is_disposed = False

        def action(scheduler: abc.SchedulerBase, state: Any = None) -> None:
            nonlocal is_disposed
            if is_disposed:
                return

            def on_completed() -> None:
                cancelable.disposable = _scheduler.schedule(action)

            try:
                current = next(sources_)
            except StopIteration:
                observer.on_completed()
            except Exception as ex:  # pylint: disable=broad-except
                observer.on_error(ex)
            else:
                d = SingleAssignmentDisposable()
                subscription.disposable = d
                d.disposable = current.subscribe(
                    observer.on_next,
                    observer.on_error,
                    on_completed,
                    scheduler=scheduler_,
                )

        cancelable.disposable = _scheduler.schedule(action)

        def dispose() -> None:
            nonlocal is_disposed
            is_disposed = True

        return CompositeDisposable(subscription, cancelable, Disposable(dispose))

    return Observable(subscribe)


__all__ = ["concat_with_iterable_"]