1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
|
from contextlib import contextmanager
from typing import Any, Dict, Generator, List, NamedTuple, Optional, Tuple, Union
from warnings import warn
import reactivex
from reactivex import Observable, typing
from reactivex.notification import Notification, OnError, OnNext
from reactivex.observable.marbles import parse
from reactivex.scheduler import NewThreadScheduler
from reactivex.typing import Callable, RelativeTime
from .reactivetest import ReactiveTest
from .recorded import Recorded
from .testscheduler import TestScheduler
new_thread_scheduler = NewThreadScheduler()
class MarblesContext(NamedTuple):
start: Callable[
[Union[Observable[Any], Callable[[], Observable[Any]]]], List[Recorded[Any]]
]
cold: Callable[
[str, Optional[Dict[Union[str, float], Any]], Optional[Exception]],
Observable[Any],
]
hot: Callable[
[str, Optional[Dict[Union[str, float], Any]], Optional[Exception]],
Observable[Any],
]
exp: Callable[
[str, Optional[Dict[Union[str, float], Any]], Optional[Exception]],
List[Recorded[Any]],
]
@contextmanager
def marbles_testing(
timespan: RelativeTime = 1.0,
) -> Generator[MarblesContext, None, None]:
"""
Initialize a :class:`rx.testing.TestScheduler` and return a namedtuple
containing the following functions that wrap its methods.
:func:`cold()`:
Parse a marbles string and return a cold observable
:func:`hot()`:
Parse a marbles string and return a hot observable
:func:`start()`:
Start the test scheduler, invoke the create function,
subscribe to the resulting sequence, dispose the subscription and
return the resulting records
:func:`exp()`:
Parse a marbles string and return a list of records
Examples:
>>> with marbles_testing() as (start, cold, hot, exp):
... obs = hot("-a-----b---c-|")
... ex = exp( "-a-----b---c-|")
... results = start(obs)
... assert results == ex
The underlying test scheduler is initialized with the following
parameters:
- created time = 100.0s
- subscribed = 200.0s
- disposed = 1000.0s
**IMPORTANT**: regarding :func:`hot()`, a marble declared as the
first character will be skipped by the test scheduler.
E.g. hot("a--b--") will only emit b.
"""
scheduler = TestScheduler()
created = 100.0
disposed = 1000.0
subscribed = 200.0
start_called = False
outside_of_context = False
def check() -> None:
if outside_of_context:
warn(
"context functions should not be called outside of " "with statement.",
UserWarning,
stacklevel=3,
)
if start_called:
warn(
"start() should only be called one time inside " "a with statement.",
UserWarning,
stacklevel=3,
)
def test_start(
create: Union[Observable[Any], Callable[[], Observable[Any]]]
) -> List[Recorded[Any]]:
nonlocal start_called
check()
if isinstance(create, Observable):
create_ = create
def default_create() -> Observable[Any]:
return create_
create_function = default_create
else:
create_function = create
mock_observer = scheduler.start(
create=create_function,
created=created,
subscribed=subscribed,
disposed=disposed,
)
start_called = True
return mock_observer.messages
def test_expected(
string: str,
lookup: Optional[Dict[Union[str, float], Any]] = None,
error: Optional[Exception] = None,
) -> List[Recorded[Any]]:
messages = parse(
string,
timespan=timespan,
time_shift=subscribed,
lookup=lookup,
error=error,
)
return messages_to_records(messages)
def test_cold(
string: str,
lookup: Optional[Dict[Union[str, float], Any]] = None,
error: Optional[Exception] = None,
) -> Observable[Any]:
check()
return reactivex.from_marbles(
string,
timespan=timespan,
lookup=lookup,
error=error,
)
def test_hot(
string: str,
lookup: Optional[Dict[Union[str, float], Any]] = None,
error: Optional[Exception] = None,
) -> Observable[Any]:
check()
hot_obs: Observable[Any] = reactivex.hot(
string,
timespan=timespan,
duetime=subscribed,
lookup=lookup,
error=error,
scheduler=scheduler,
)
return hot_obs
try:
yield MarblesContext(test_start, test_cold, test_hot, test_expected)
finally:
outside_of_context = True
def messages_to_records(
messages: List[Tuple[typing.RelativeTime, Notification[Any]]]
) -> List[Recorded[Any]]:
"""
Helper function to convert messages returned by parse() to a list of
Recorded.
"""
records: List[Recorded[Any]] = []
for message in messages:
time, notification = message
if isinstance(time, float):
time_ = int(time)
else:
time_ = time.microseconds // 1000
if isinstance(notification, OnNext):
record = ReactiveTest.on_next(time_, notification.value)
elif isinstance(notification, OnError):
record = ReactiveTest.on_error(time_, notification.exception)
else:
record = ReactiveTest.on_completed(time_)
records.append(record)
return records
|