1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
from collections import OrderedDict
from typing import Any, Callable, Optional, Tuple, TypeVar
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable
from reactivex.internal import noop
from reactivex.operators import take
_T1 = TypeVar("_T1")
_T2 = TypeVar("_T2")
def join_(
right: Observable[_T2],
left_duration_mapper: Callable[[Any], Observable[Any]],
right_duration_mapper: Callable[[Any], Observable[Any]],
) -> Callable[[Observable[_T1]], Observable[Tuple[_T1, _T2]]]:
def join(source: Observable[_T1]) -> Observable[Tuple[_T1, _T2]]:
"""Correlates the elements of two sequences based on
overlapping durations.
Args:
source: Source observable.
Return:
An observable sequence that contains elements
combined into a tuple from source elements that have an overlapping
duration.
"""
left = source
def subscribe(
observer: abc.ObserverBase[Tuple[_T1, _T2]],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
group = CompositeDisposable()
left_done = False
left_map: OrderedDict[int, _T1] = OrderedDict()
left_id = 0
right_done = False
right_map: OrderedDict[int, _T2] = OrderedDict()
right_id = 0
def on_next_left(value: _T1):
nonlocal left_id
duration = None
current_id = left_id
left_id += 1
md = SingleAssignmentDisposable()
left_map[current_id] = value
group.add(md)
def expire():
if current_id in left_map:
del left_map[current_id]
if not len(left_map) and left_done:
observer.on_completed()
group.remove(md)
try:
duration = left_duration_mapper(value)
except Exception as exception:
observer.on_error(exception)
return
md.disposable = duration.pipe(take(1)).subscribe(
noop, observer.on_error, lambda: expire(), scheduler=scheduler
)
for val in right_map.values():
result = (value, val)
observer.on_next(result)
def on_completed_left() -> None:
nonlocal left_done
left_done = True
if right_done or not len(left_map):
observer.on_completed()
group.add(
left.subscribe(
on_next_left,
observer.on_error,
on_completed_left,
scheduler=scheduler,
)
)
def on_next_right(value: _T2):
nonlocal right_id
duration = None
current_id = right_id
right_id += 1
md = SingleAssignmentDisposable()
right_map[current_id] = value
group.add(md)
def expire():
if current_id in right_map:
del right_map[current_id]
if not len(right_map) and right_done:
observer.on_completed()
group.remove(md)
try:
duration = right_duration_mapper(value)
except Exception as exception:
observer.on_error(exception)
return
md.disposable = duration.pipe(take(1)).subscribe(
noop, observer.on_error, lambda: expire(), scheduler=scheduler
)
for val in left_map.values():
result = (val, value)
observer.on_next(result)
def on_completed_right():
nonlocal right_done
right_done = True
if left_done or not len(right_map):
observer.on_completed()
group.add(
right.subscribe(
on_next_right,
observer.on_error,
on_completed_right,
scheduler=scheduler,
)
)
return group
return Observable(subscribe)
return join
__all__ = ["join_"]
|