1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
|
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, NoReturn, Optional
import pytest
from pykka import Actor
from tests.log_handler import LogLevel, PykkaTestLogHandler
if TYPE_CHECKING:
from collections.abc import Iterator
from types import TracebackType
from pykka import ActorRef
from tests.types import Events, Runtime
pytestmark = pytest.mark.usefixtures("_stop_all")
class LoggingActor(Actor):
def __init__(self, events: Events) -> None:
super().__init__()
self.events = events
def on_stop(self) -> None:
self.events.on_stop_was_called.set()
def on_failure(
self,
exception_type: Optional[type[BaseException]],
exception_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.events.on_failure_was_called.set()
def on_receive(self, message: Any) -> Any:
if message.get("command") == "raise exception":
return self.raise_exception()
if message.get("command") == "raise base exception":
raise BaseException
return super().on_receive(message)
def raise_exception(self) -> NoReturn:
raise Exception("foo")
@pytest.fixture(scope="module")
def actor_class(runtime: Runtime) -> type[LoggingActor]:
class LoggingActorImpl(LoggingActor, runtime.actor_class): # type: ignore[name-defined]
pass
return LoggingActorImpl
@pytest.fixture
def actor_ref(
actor_class: type[LoggingActor],
events: Events,
) -> Iterator[ActorRef[LoggingActor]]:
ref = actor_class.start(events)
yield ref
ref.stop()
def test_null_handler_is_added_to_avoid_warnings() -> None:
logger = logging.getLogger("pykka")
handler_names = [h.__class__.__name__ for h in logger.handlers]
assert "NullHandler" in handler_names
def test_unexpected_messages_are_logged(
actor_ref: ActorRef[LoggingActor],
log_handler: PykkaTestLogHandler,
) -> None:
actor_ref.ask({"unhandled": "message"})
log_handler.wait_for_message(LogLevel.WARNING)
with log_handler.lock:
assert len(log_handler.messages[LogLevel.WARNING]) == 1
log_record = log_handler.messages[LogLevel.WARNING][0]
assert log_record.getMessage().split(": ")[0] == (
f"Unexpected message received by {actor_ref}"
)
def test_exception_is_logged_when_returned_to_caller(
actor_ref: ActorRef[LoggingActor],
log_handler: PykkaTestLogHandler,
) -> None:
with pytest.raises(Exception, match="foo"):
actor_ref.proxy().raise_exception().get()
log_handler.wait_for_message(LogLevel.INFO)
with log_handler.lock:
assert len(log_handler.messages[LogLevel.INFO]) == 1
log_record = log_handler.messages[LogLevel.INFO][0]
assert log_record.getMessage() == (
f"Exception returned from {actor_ref} to caller:"
)
assert log_record.exc_info
assert log_record.exc_info[0] is Exception
assert str(log_record.exc_info[1]) == "foo"
def test_exception_is_logged_when_not_reply_requested(
actor_ref: ActorRef[LoggingActor],
events: Events,
log_handler: PykkaTestLogHandler,
) -> None:
events.on_failure_was_called.clear()
actor_ref.tell({"command": "raise exception"})
events.on_failure_was_called.wait(5)
assert events.on_failure_was_called.is_set()
log_handler.wait_for_message(LogLevel.ERROR)
with log_handler.lock:
assert len(log_handler.messages[LogLevel.ERROR]) == 1
log_record = log_handler.messages[LogLevel.ERROR][0]
assert log_record.getMessage() == f"Unhandled exception in {actor_ref}:"
assert log_record.exc_info
assert log_record.exc_info[0] is Exception
assert str(log_record.exc_info[1]) == "foo"
def test_base_exception_is_logged(
actor_ref: ActorRef[LoggingActor],
events: Events,
log_handler: PykkaTestLogHandler,
) -> None:
log_handler.reset()
events.on_stop_was_called.clear()
actor_ref.tell({"command": "raise base exception"})
events.on_stop_was_called.wait(5)
assert events.on_stop_was_called.is_set()
log_handler.wait_for_message(LogLevel.DEBUG, num_messages=3)
with log_handler.lock:
assert len(log_handler.messages[LogLevel.DEBUG]) == 3
log_record = log_handler.messages[LogLevel.DEBUG][0]
assert log_record.getMessage() == (
f"BaseException() in {actor_ref}. Stopping all actors."
)
def test_exception_in_on_start_is_logged(
early_failing_actor_class: type[LoggingActor],
events: Events,
log_handler: PykkaTestLogHandler,
) -> None:
log_handler.reset()
actor_ref = early_failing_actor_class.start(events)
events.on_start_was_called.wait(5)
log_handler.wait_for_message(LogLevel.ERROR)
with log_handler.lock:
assert len(log_handler.messages[LogLevel.ERROR]) == 1
log_record = log_handler.messages[LogLevel.ERROR][0]
assert log_record.getMessage() == f"Unhandled exception in {actor_ref}:"
def test_exception_in_on_stop_is_logged(
late_failing_actor_class: type[LoggingActor],
events: Events,
log_handler: PykkaTestLogHandler,
) -> None:
log_handler.reset()
actor_ref = late_failing_actor_class.start(events)
events.on_stop_was_called.wait(5)
log_handler.wait_for_message(LogLevel.ERROR)
with log_handler.lock:
assert len(log_handler.messages[LogLevel.ERROR]) == 1
log_record = log_handler.messages[LogLevel.ERROR][0]
assert log_record.getMessage() == f"Unhandled exception in {actor_ref}:"
def test_exception_in_on_failure_is_logged(
failing_on_failure_actor_class: type[LoggingActor],
events: Events,
log_handler: PykkaTestLogHandler,
) -> None:
log_handler.reset()
actor_ref = failing_on_failure_actor_class.start(events)
actor_ref.tell({"command": "raise exception"})
events.on_failure_was_called.wait(5)
log_handler.wait_for_message(LogLevel.ERROR, num_messages=2)
with log_handler.lock:
assert len(log_handler.messages[LogLevel.ERROR]) == 2
log_record = log_handler.messages[LogLevel.ERROR][0]
assert log_record.getMessage() == f"Unhandled exception in {actor_ref}:"
|