File: _skipwithtime.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (66 lines) | stat: -rw-r--r-- 2,220 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from typing import Any, Callable, Optional, TypeVar

from reactivex import Observable, abc, typing
from reactivex.disposable import CompositeDisposable
from reactivex.scheduler import TimeoutScheduler

_T = TypeVar("_T")


def skip_with_time_(
    duration: typing.RelativeTime, scheduler: Optional[abc.SchedulerBase] = None
) -> Callable[[Observable[_T]], Observable[_T]]:
    def skip_with_time(source: Observable[_T]) -> Observable[_T]:
        """Skips elements for the specified duration from the start of
        the observable source sequence.

        Args:
            >>> res = skip_with_time(5.0)

        Specifying a zero value for duration doesn't guarantee no
        elements will be dropped from the start of the source sequence.
        This is a side-effect of the asynchrony introduced by the
        scheduler, where the action that causes callbacks from the
        source sequence to be forwarded may not execute immediately,
        despite the zero due time.

        Errors produced by the source sequence are always forwarded to
        the result sequence, even if the error occurs before the
        duration.

        Args:
            duration: Duration for skipping elements from the start of
            the sequence.

        Returns:
            An observable sequence with the elements skipped during the
            specified duration from the start of the source sequence.
        """

        def subscribe(
            observer: abc.ObserverBase[_T],
            scheduler_: Optional[abc.SchedulerBase] = None,
        ):
            _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()
            open = [False]

            def action(scheduler: abc.SchedulerBase, state: Any) -> None:
                open[0] = True

            t = _scheduler.schedule_relative(duration, action)

            def on_next(x: _T):
                if open[0]:
                    observer.on_next(x)

            d = source.subscribe(
                on_next, observer.on_error, observer.on_completed, scheduler=scheduler_
            )
            return CompositeDisposable(t, d)

        return Observable(subscribe)

    return skip_with_time


__all__ = ["skip_with_time_"]