File: _lastordefault.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (69 lines) | stat: -rw-r--r-- 1,968 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from typing import Any, Callable, Optional, TypeVar

from reactivex import Observable, abc
from reactivex import operators as ops
from reactivex import typing
from reactivex.internal.exceptions import SequenceContainsNoElementsError

_T = TypeVar("_T")


def last_or_default_async(
    source: Observable[_T],
    has_default: bool = False,
    default_value: Optional[_T] = None,
) -> Observable[Optional[_T]]:
    def subscribe(
        observer: abc.ObserverBase[Optional[_T]],
        scheduler: Optional[abc.SchedulerBase] = None,
    ):
        value = [default_value]
        seen_value = [False]

        def on_next(x: _T) -> None:
            value[0] = x
            seen_value[0] = True

        def on_completed():
            if not seen_value[0] and not has_default:
                observer.on_error(SequenceContainsNoElementsError())
            else:
                observer.on_next(value[0])
                observer.on_completed()

        return source.subscribe(
            on_next, observer.on_error, on_completed, scheduler=scheduler
        )

    return Observable(subscribe)


def last_or_default(
    default_value: Optional[_T] = None, predicate: Optional[typing.Predicate[_T]] = None
) -> Callable[[Observable[_T]], Observable[Any]]:
    def last_or_default(source: Observable[Any]) -> Observable[Any]:
        """Return last or default element.

        Examples:
            >>> res = _last_or_default(source)

        Args:
            source: Observable sequence to get the last item from.

        Returns:
            Observable sequence containing the last element in the
            observable sequence.
        """

        if predicate:
            return source.pipe(
                ops.filter(predicate),
                ops.last_or_default(default_value),
            )

        return last_or_default_async(source, True, default_value)

    return last_or_default


__all__ = ["last_or_default", "last_or_default_async"]