1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
|
from typing import Callable, Optional, TypeVar
from reactivex import Observable, abc
from reactivex.typing import Predicate, PredicateIndexed
_T = TypeVar("_T")
# pylint: disable=redefined-builtin
def filter_(predicate: Predicate[_T]) -> Callable[[Observable[_T]], Observable[_T]]:
def filter(source: Observable[_T]) -> Observable[_T]:
"""Partially applied filter operator.
Filters the elements of an observable sequence based on a
predicate.
Example:
>>> filter(source)
Args:
source: Source observable to filter.
Returns:
A filtered observable sequence.
"""
def subscribe(
observer: abc.ObserverBase[_T], scheduler: Optional[abc.SchedulerBase]
) -> abc.DisposableBase:
def on_next(value: _T):
try:
should_run = predicate(value)
except Exception as ex: # pylint: disable=broad-except
observer.on_error(ex)
return
if should_run:
observer.on_next(value)
return source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
)
return Observable(subscribe)
return filter
def filter_indexed_(
predicate_indexed: Optional[PredicateIndexed[_T]] = None,
) -> Callable[[Observable[_T]], Observable[_T]]:
def filter_indexed(source: Observable[_T]) -> Observable[_T]:
"""Partially applied indexed filter operator.
Filters the elements of an observable sequence based on a
predicate by incorporating the element's index.
Example:
>>> filter_indexed(source)
Args:
source: Source observable to filter.
Returns:
A filtered observable sequence.
"""
def subscribe(
observer: abc.ObserverBase[_T], scheduler: Optional[abc.SchedulerBase]
):
count = 0
def on_next(value: _T):
nonlocal count
should_run = True
if predicate_indexed:
try:
should_run = predicate_indexed(value, count)
except Exception as ex: # pylint: disable=broad-except
observer.on_error(ex)
return
else:
count += 1
if should_run:
observer.on_next(value)
return source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
)
return Observable(subscribe)
return filter_indexed
__all__ = ["filter_", "filter_indexed_"]
|