File: conftest.py

package info (click to toggle)
python-pykka 4.2.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 520 kB
  • sloc: python: 2,817; makefile: 113
file content (138 lines) | stat: -rw-r--r-- 3,860 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from __future__ import annotations

import logging
import threading
import time
from typing import (
    TYPE_CHECKING,
    Any,
    cast,
)

import pytest

from pykka import ActorRegistry, ThreadingActor, ThreadingFuture
from tests.log_handler import PykkaTestLogHandler
from tests.types import Events, Runtime

if TYPE_CHECKING:
    from collections.abc import Iterator

    from pykka import Actor, Future


RUNTIMES = {
    "threading": pytest.param(
        Runtime(
            name="threading",
            actor_class=ThreadingActor,
            event_class=threading.Event,
            future_class=ThreadingFuture,
            sleep_func=time.sleep,
        ),
        id="threading",
    )
}


@pytest.fixture(scope="session", params=RUNTIMES.values())
def runtime(request: pytest.FixtureRequest) -> Runtime:
    return cast(Runtime, request.param)


@pytest.fixture
def _stop_all() -> Iterator[None]:  # pyright: ignore[reportUnusedFunction]
    yield
    ActorRegistry.stop_all()


@pytest.fixture
def log_handler() -> Iterator[logging.Handler]:
    log_handler = PykkaTestLogHandler()

    root_logger = logging.getLogger()
    root_logger.addHandler(log_handler)
    # pytest sets the root logger level to WARNING. We reset it to NOTSET
    # so that all log messages reaches our log handler.
    root_logger.setLevel(logging.NOTSET)

    yield log_handler

    log_handler.close()


@pytest.fixture
def events(runtime: Runtime) -> Events:
    return Events(
        on_start_was_called=runtime.event_class(),
        on_stop_was_called=runtime.event_class(),
        on_failure_was_called=runtime.event_class(),
        greetings_was_received=runtime.event_class(),
        actor_registered_before_on_start_was_called=runtime.event_class(),
    )


@pytest.fixture(scope="module")
def early_failing_actor_class(runtime: Runtime) -> type[Actor]:
    class EarlyFailingActor(runtime.actor_class):  # type: ignore[name-defined]
        def __init__(self, events: Events) -> None:
            super().__init__()
            self.events = events

        def on_start(self) -> None:
            try:
                raise RuntimeError("on_start failure")
            finally:
                self.events.on_start_was_called.set()

    return EarlyFailingActor


@pytest.fixture(scope="module")
def late_failing_actor_class(runtime: Runtime) -> type[Actor]:
    class LateFailingActor(runtime.actor_class):  # type: ignore[name-defined]
        def __init__(self, events: Events) -> None:
            super().__init__()
            self.events = events

        def on_start(self) -> None:
            self.stop()

        def on_stop(self) -> None:
            try:
                raise RuntimeError("on_stop failure")
            finally:
                self.events.on_stop_was_called.set()

    return LateFailingActor


@pytest.fixture(scope="module")
def failing_on_failure_actor_class(runtime: Runtime) -> type[Actor]:
    class FailingOnFailureActor(runtime.actor_class):  # type: ignore[name-defined]
        def __init__(self, events: Events) -> None:
            super().__init__()
            self.events = events

        def on_receive(self, message: Any) -> Any:
            if message.get("command") == "raise exception":
                raise Exception("on_receive failure")
            return super().on_receive(message)

        def on_failure(self, *args: Any) -> None:  # pyright: ignore[reportIncompatibleMethodOverride]
            try:
                raise RuntimeError("on_failure failure")
            finally:
                self.events.on_failure_was_called.set()

    return FailingOnFailureActor


@pytest.fixture
def future(runtime: Runtime) -> Future[Any]:
    return runtime.future_class()


@pytest.fixture
def futures(runtime: Runtime) -> list[Future[Any]]:
    return [runtime.future_class() for _ in range(3)]