File: _partition.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (88 lines) | stat: -rw-r--r-- 3,021 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from typing import Callable, List, TypeVar

from reactivex import Observable
from reactivex import operators as ops
from reactivex.typing import Predicate, PredicateIndexed

_T = TypeVar("_T")


def partition_(
    predicate: Predicate[_T],
) -> Callable[[Observable[_T]], List[Observable[_T]]]:
    def partition(source: Observable[_T]) -> List[Observable[_T]]:
        """The partially applied `partition` operator.

        Returns two observables which partition the observations of the
        source by the given function. The first will trigger
        observations for those values for which the predicate returns
        true. The second will trigger observations for those values
        where the predicate returns false. The predicate is executed
        once for each subscribed observer. Both also propagate all
        error observations arising from the source and each completes
        when the source completes.

        Args:
            source: Source obserable to partition.

        Returns:
            A list of observables. The first triggers when the
            predicate returns True, and the second triggers when the
            predicate returns False.
        """

        def not_predicate(x: _T) -> bool:
            return not predicate(x)

        published = source.pipe(
            ops.publish(),
            ops.ref_count(),
        )
        return [
            published.pipe(ops.filter(predicate)),
            published.pipe(ops.filter(not_predicate)),
        ]

    return partition


def partition_indexed_(
    predicate_indexed: PredicateIndexed[_T],
) -> Callable[[Observable[_T]], List[Observable[_T]]]:
    def partition_indexed(source: Observable[_T]) -> List[Observable[_T]]:
        """The partially applied indexed partition operator.

        Returns two observables which partition the observations of the
        source by the given function. The first will trigger
        observations for those values for which the predicate returns
        true. The second will trigger observations for those values
        where the predicate returns false. The predicate is executed
        once for each subscribed observer. Both also propagate all
        error observations arising from the source and each completes
        when the source completes.

        Args:
            source: Source observable to partition.

        Returns:
            A list of observables. The first triggers when the
            predicate returns True, and the second triggers when the
            predicate returns False.
        """

        def not_predicate_indexed(x: _T, i: int) -> bool:
            return not predicate_indexed(x, i)

        published = source.pipe(
            ops.publish(),
            ops.ref_count(),
        )
        return [
            published.pipe(ops.filter_indexed(predicate_indexed)),
            published.pipe(ops.filter_indexed(not_predicate_indexed)),
        ]

    return partition_indexed


__all__ = ["partition_", "partition_indexed_"]