File: recorded.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (41 lines) | stat: -rw-r--r-- 1,095 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from typing import TYPE_CHECKING, Any, Generic, TypeVar, Union, cast

from reactivex import Notification

if TYPE_CHECKING:
    from .reactivetest import OnErrorPredicate, OnNextPredicate


_T = TypeVar("_T")


class Recorded(Generic[_T]):
    def __init__(
        self,
        time: int,
        value: Union[Notification[_T], "OnNextPredicate[_T]", "OnErrorPredicate[_T]"],
        # comparer: Optional[typing.Comparer[_T]] = None,
    ):
        self.time = time
        self.value = value
        # self.comparer = comparer or default_comparer

    def __eq__(self, other: Any) -> bool:
        """Returns true if a recorded value matches another recorded value"""

        if isinstance(other, Recorded):
            other_ = cast(Recorded[_T], other)
            time_match = self.time == other_.time
            if not time_match:
                return False
            return self.value == other_.value

        return False

    equals = __eq__

    def __repr__(self) -> str:
        return str(self)

    def __str__(self) -> str:
        return "%s@%s" % (self.value, self.time)