File: zip.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (90 lines) | stat: -rw-r--r-- 2,828 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from asyncio import Future
from threading import RLock
from typing import Any, List, Optional, Tuple

from reactivex import Observable, abc, from_future
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable
from reactivex.internal import synchronized


def zip_(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]:
    """Merges the specified observable sequences into one observable
    sequence by creating a tuple whenever all of the
    observable sequences have produced an element at a corresponding
    index.

    Example:
        >>> res = zip(obs1, obs2)

    Args:
        args: Observable sources to zip.

    Returns:
        An observable sequence containing the result of combining
        elements of the sources as tuple.
    """

    sources = list(args)

    def subscribe(
        observer: abc.ObserverBase[Any], scheduler: Optional[abc.SchedulerBase] = None
    ) -> CompositeDisposable:
        n = len(sources)
        queues: List[List[Any]] = [[] for _ in range(n)]
        lock = RLock()
        is_completed = [False] * n

        @synchronized(lock)
        def next_(i: int) -> None:
            if all(len(q) for q in queues):
                try:
                    queued_values = [x.pop(0) for x in queues]
                    res = tuple(queued_values)
                except Exception as ex:  # pylint: disable=broad-except
                    observer.on_error(ex)
                    return

                observer.on_next(res)

                # after sending the zipped values, complete the observer if at least one
                # upstream observable is completed and its queue has length zero
                if any(
                    (
                        done
                        for queue, done in zip(queues, is_completed)
                        if len(queue) == 0
                    )
                ):
                    observer.on_completed()

        def completed(i: int) -> None:
            is_completed[i] = True
            if len(queues[i]) == 0:
                observer.on_completed()

        subscriptions: List[Optional[abc.DisposableBase]] = [None] * n

        def func(i: int) -> None:
            source: Observable[Any] = sources[i]
            if isinstance(source, Future):
                source = from_future(source)

            sad = SingleAssignmentDisposable()

            def on_next(x: Any) -> None:
                queues[i].append(x)
                next_(i)

            sad.disposable = source.subscribe(
                on_next, observer.on_error, lambda: completed(i), scheduler=scheduler
            )
            subscriptions[i] = sad

        for idx in range(n):
            func(idx)
        return CompositeDisposable(subscriptions)

    return Observable(subscribe)


__all__ = ["zip_"]