File: _whiledo.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (38 lines) | stat: -rw-r--r-- 1,071 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import itertools
from asyncio import Future
from typing import Callable, TypeVar, Union

import reactivex
from reactivex import Observable
from reactivex.internal.utils import infinite
from reactivex.typing import Predicate

_T = TypeVar("_T")


def while_do_(
    condition: Predicate[Observable[_T]],
) -> Callable[[Observable[_T]], Observable[_T]]:
    def while_do(source: Union[Observable[_T], "Future[_T]"]) -> Observable[_T]:
        """Repeats source as long as condition holds emulating a while
        loop.

        Args:
            source: The observable sequence that will be run if the
                condition function returns true.

        Returns:
            An observable sequence which is repeated as long as the
            condition holds.
        """
        if isinstance(source, Future):
            obs = reactivex.from_future(source)
        else:
            obs = source
        it = itertools.takewhile(condition, (obs for _ in infinite()))
        return reactivex.concat_with_iterable(it)

    return while_do


__all__ = ["while_do_"]