File: test_ref.py

package info (click to toggle)
python-pykka 4.2.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 520 kB
  • sloc: python: 2,817; makefile: 113
file content (214 lines) | stat: -rw-r--r-- 5,027 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Callable

import pytest

from pykka import Actor, ActorDeadError, Timeout

if TYPE_CHECKING:
    from collections.abc import Iterator

    from pykka import ActorRef, Future
    from tests.types import Runtime


class ReferencableActor(Actor):
    received_messages = None

    def __init__(
        self,
        sleep_func: Callable[[float], None],
        received_message: Future[str],
    ) -> None:
        super().__init__()
        self.sleep_func = sleep_func
        self.received_message = received_message

    def on_receive(self, message: str) -> Any:
        if message == "ping":
            self.sleep_func(0.01)
            return "pong"

        self.received_message.set(message)
        return None


@pytest.fixture(scope="module")
def actor_class(runtime: Runtime) -> type[ReferencableActor]:
    class ReferencableActorImpl(ReferencableActor, runtime.actor_class):  # type: ignore[name-defined]
        pass

    return ReferencableActorImpl


@pytest.fixture
def received_message(runtime: Runtime) -> Future[str]:
    return runtime.future_class()


@pytest.fixture
def actor_ref(
    runtime: Runtime,
    actor_class: type[ReferencableActor],
    received_message: Future[str],
) -> Iterator[ActorRef[ReferencableActor]]:
    ref = actor_class.start(
        runtime.sleep_func,
        received_message,
    )
    yield ref
    ref.stop()


def test_repr_is_wrapped_in_lt_and_gt(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    result = repr(actor_ref)

    assert result.startswith("<")
    assert result.endswith(">")


def test_repr_reveals_that_this_is_a_ref(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    assert "ActorRef" in repr(actor_ref)


def test_repr_contains_actor_class_name(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    assert "ReferencableActor" in repr(actor_ref)


def test_repr_contains_actor_urn(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    assert actor_ref.actor_urn in repr(actor_ref)


def test_str_contains_actor_class_name(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    assert "ReferencableActor" in str(actor_ref)


def test_str_contains_actor_urn(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    assert actor_ref.actor_urn in str(actor_ref)


def test_is_alive_returns_true_for_running_actor(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    assert actor_ref.is_alive()


def test_is_alive_returns_false_for_dead_actor(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    actor_ref.stop()

    assert not actor_ref.is_alive()


def test_stop_returns_true_if_actor_is_stopped(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    assert actor_ref.stop()


def test_stop_does_not_stop_already_dead_actor(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    assert actor_ref.stop()
    assert not actor_ref.stop()


def test_tell_delivers_message_to_actors_custom_on_receive(
    actor_ref: ActorRef[ReferencableActor],
    received_message: Future[str],
) -> None:
    actor_ref.tell("a custom message")

    assert received_message.get(timeout=1) == "a custom message"


@pytest.mark.parametrize(
    "message",
    [
        123,
        123.456,
        {"a": "dict"},
        ("a", "tuple"),
        ["a", "list"],
        Exception("an exception"),
    ],
)
def test_tell_accepts_any_object_as_the_message(
    actor_ref: ActorRef[ReferencableActor],
    message: Any,
    received_message: Future[Any],
) -> None:
    actor_ref.tell(message)

    assert received_message.get(timeout=1) == message


def test_tell_fails_if_actor_is_stopped(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    actor_ref.stop()

    with pytest.raises(ActorDeadError) as exc_info:
        actor_ref.tell("a custom message")

    assert str(exc_info.value) == f"{actor_ref} not found"


def test_ask_blocks_until_response_arrives(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    result = actor_ref.ask("ping")

    assert result == "pong"


def test_ask_can_timeout_if_blocked_too_long(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    with pytest.raises(Timeout):
        actor_ref.ask("ping", timeout=0)


def test_ask_can_return_future_instead_of_blocking(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    future = actor_ref.ask("ping", block=False)

    assert future.get() == "pong"


def test_ask_fails_if_actor_is_stopped(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    actor_ref.stop()

    with pytest.raises(ActorDeadError) as exc_info:
        actor_ref.ask("ping")

    assert str(exc_info.value) == f"{actor_ref} not found"


def test_ask_nonblocking_fails_future_if_actor_is_stopped(
    actor_ref: ActorRef[ReferencableActor],
) -> None:
    actor_ref.stop()
    future = actor_ref.ask("ping", block=False)

    with pytest.raises(ActorDeadError) as exc_info:
        future.get()

    assert str(exc_info.value) == f"{actor_ref} not found"