File: conftest.py

package info (click to toggle)
pypck 0.8.10-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 528 kB
  • sloc: python: 5,616; makefile: 15
file content (118 lines) | stat: -rw-r--r-- 3,587 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""Core testing functionality."""

import asyncio
from collections.abc import AsyncGenerator
from typing import Any

import pytest

from pypck.connection import PchkConnectionManager
from pypck.lcn_addr import LcnAddr
from pypck.module import ModuleConnection
from pypck.pck_commands import PckGenerator

from .mock_pchk import MockPchkServer

HOST = "127.0.0.1"
PORT = 4114
USERNAME = "lcn_username"
PASSWORD = "lcn_password"


class MockPchkConnectionManager(PchkConnectionManager):
    """Mock the PchkConnectionManager."""

    def __init__(self, *args: Any, **kwargs: Any):
        """Construct mock for PchkConnectionManager."""
        self.data_received: list[str] = []
        super().__init__(*args, **kwargs)

    async def process_message(self, message: str) -> None:
        """Process incoming message."""
        await super().process_message(message)
        self.data_received.append(message)

    async def received(
        self, message: str, timeout: int = 5, remove: bool = True
    ) -> bool:
        """Return if given message was received."""

        async def receive_loop(data: str, remove: bool) -> None:
            while data not in self.data_received:
                await asyncio.sleep(0.05)
            if remove:
                self.data_received.remove(data)

        try:
            await asyncio.wait_for(receive_loop(message, remove), timeout=timeout)
            return True
        except asyncio.TimeoutError:
            return False


def encode_pck(pck: str) -> bytes:
    """Encode the given PCK string as PCK binary string."""
    return (pck + PckGenerator.TERMINATION).encode()


@pytest.fixture
async def pchk_server() -> AsyncGenerator[MockPchkServer, None]:
    """Create a fake PchkServer and run."""
    pchk_server = MockPchkServer(
        host=HOST, port=PORT, username=USERNAME, password=PASSWORD
    )
    await pchk_server.run()
    yield pchk_server
    await pchk_server.stop()


@pytest.fixture
async def pypck_client() -> AsyncGenerator[PchkConnectionManager, None]:
    """Create a PchkConnectionManager for testing.

    Create a PchkConnection Manager for testing. Add a received coroutine method
    which returns if the specified message was received (and processed).
    """
    pcm = MockPchkConnectionManager(
        HOST, PORT, USERNAME, PASSWORD, settings={"SK_NUM_TRIES": 0}
    )
    yield pcm
    await pcm.async_close()
    assert len(pcm.task_registry.tasks) == 0


@pytest.fixture
async def module10(
    pypck_client: PchkConnectionManager,
) -> AsyncGenerator[ModuleConnection, None]:
    """Create test module with addr_id 10."""
    lcn_addr = LcnAddr(0, 10, False)
    module = pypck_client.get_module_conn(lcn_addr)
    yield module
    await module.cancel_requests()


@pytest.fixture
async def pchk_server_2() -> AsyncGenerator[MockPchkServer, None]:
    """Create a fake PchkServer and run."""
    pchk_server = MockPchkServer(
        host=HOST, port=PORT + 1, username=USERNAME, password=PASSWORD
    )
    await pchk_server.run()
    yield pchk_server
    await pchk_server.stop()


@pytest.fixture
async def pypck_client_2() -> AsyncGenerator[PchkConnectionManager, None]:
    """Create a PchkConnectionManager for testing.

    Create a PchkConnection Manager for testing. Add a received coroutine method
    which returns if the specified message was received (and processed).
    """
    pcm = MockPchkConnectionManager(
        HOST, PORT + 1, USERNAME, PASSWORD, settings={"SK_NUM_TRIES": 0}
    )
    yield pcm
    await pcm.async_close()
    assert len(pcm.task_registry.tasks) == 0