1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
from __future__ import annotations
import logging
import threading
import time
from typing import (
TYPE_CHECKING,
Any,
cast,
)
import pytest
from pykka import ActorRegistry, ThreadingActor, ThreadingFuture
from tests.log_handler import PykkaTestLogHandler
from tests.types import Events, Runtime
if TYPE_CHECKING:
from collections.abc import Iterator
from pykka import Actor, Future
RUNTIMES = {
"threading": pytest.param(
Runtime(
name="threading",
actor_class=ThreadingActor,
event_class=threading.Event,
future_class=ThreadingFuture,
sleep_func=time.sleep,
),
id="threading",
)
}
@pytest.fixture(scope="session", params=RUNTIMES.values())
def runtime(request: pytest.FixtureRequest) -> Runtime:
return cast(Runtime, request.param)
@pytest.fixture
def _stop_all() -> Iterator[None]: # pyright: ignore[reportUnusedFunction]
yield
ActorRegistry.stop_all()
@pytest.fixture
def log_handler() -> Iterator[logging.Handler]:
log_handler = PykkaTestLogHandler()
root_logger = logging.getLogger()
root_logger.addHandler(log_handler)
# pytest sets the root logger level to WARNING. We reset it to NOTSET
# so that all log messages reaches our log handler.
root_logger.setLevel(logging.NOTSET)
yield log_handler
log_handler.close()
@pytest.fixture
def events(runtime: Runtime) -> Events:
return Events(
on_start_was_called=runtime.event_class(),
on_stop_was_called=runtime.event_class(),
on_failure_was_called=runtime.event_class(),
greetings_was_received=runtime.event_class(),
actor_registered_before_on_start_was_called=runtime.event_class(),
)
@pytest.fixture(scope="module")
def early_failing_actor_class(runtime: Runtime) -> type[Actor]:
class EarlyFailingActor(runtime.actor_class): # type: ignore[name-defined]
def __init__(self, events: Events) -> None:
super().__init__()
self.events = events
def on_start(self) -> None:
try:
raise RuntimeError("on_start failure")
finally:
self.events.on_start_was_called.set()
return EarlyFailingActor
@pytest.fixture(scope="module")
def late_failing_actor_class(runtime: Runtime) -> type[Actor]:
class LateFailingActor(runtime.actor_class): # type: ignore[name-defined]
def __init__(self, events: Events) -> None:
super().__init__()
self.events = events
def on_start(self) -> None:
self.stop()
def on_stop(self) -> None:
try:
raise RuntimeError("on_stop failure")
finally:
self.events.on_stop_was_called.set()
return LateFailingActor
@pytest.fixture(scope="module")
def failing_on_failure_actor_class(runtime: Runtime) -> type[Actor]:
class FailingOnFailureActor(runtime.actor_class): # type: ignore[name-defined]
def __init__(self, events: Events) -> None:
super().__init__()
self.events = events
def on_receive(self, message: Any) -> Any:
if message.get("command") == "raise exception":
raise Exception("on_receive failure")
return super().on_receive(message)
def on_failure(self, *args: Any) -> None: # pyright: ignore[reportIncompatibleMethodOverride]
try:
raise RuntimeError("on_failure failure")
finally:
self.events.on_failure_was_called.set()
return FailingOnFailureActor
@pytest.fixture
def future(runtime: Runtime) -> Future[Any]:
return runtime.future_class()
@pytest.fixture
def futures(runtime: Runtime) -> list[Future[Any]]:
return [runtime.future_class() for _ in range(3)]
|