File: reactivetest.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (84 lines) | stat: -rw-r--r-- 2,119 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import math
import types
from typing import Any, Generic, TypeVar, Union

from reactivex import typing
from reactivex.notification import OnCompleted, OnError, OnNext

from .recorded import Recorded
from .subscription import Subscription

_T = TypeVar("_T")


def is_prime(i: int) -> bool:
    """Tests if number is prime or not"""

    if i <= 1:
        return False

    _max = int(math.floor(math.sqrt(i)))
    for j in range(2, _max + 1):
        if not i % j:
            return False

    return True


# New predicate tests
class OnNextPredicate(Generic[_T]):
    def __init__(self, predicate: typing.Predicate[_T]) -> None:
        self.predicate = predicate

    def __eq__(self, other: Any) -> bool:
        if other == self:
            return True
        if other is None:
            return False
        if other.kind != "N":
            return False
        return self.predicate(other.value)


class OnErrorPredicate(Generic[_T]):
    def __init__(self, predicate: typing.Predicate[_T]):
        self.predicate = predicate

    def __eq__(self, other: Any) -> bool:
        if other == self:
            return True
        if other is None:
            return False
        if other.kind != "E":
            return False
        return self.predicate(other.exception)


class ReactiveTest:
    created = 100
    subscribed = 200
    disposed = 1000

    @staticmethod
    def on_next(ticks: int, value: _T) -> Recorded[_T]:
        if isinstance(value, types.FunctionType):
            return Recorded(ticks, OnNextPredicate(value))

        return Recorded(ticks, OnNext(value))

    @staticmethod
    def on_error(
        ticks: int, error: Union[Exception, str, types.FunctionType]
    ) -> Recorded[Any]:
        if isinstance(error, types.FunctionType):
            return Recorded(ticks, OnErrorPredicate(error))

        return Recorded(ticks, OnError(error))

    @staticmethod
    def on_completed(ticks: int) -> Recorded[Any]:
        return Recorded(ticks, OnCompleted())

    @staticmethod
    def subscribe(start: int, end: int) -> Subscription:
        return Subscription(start, end)