File: test_input.py

package info (click to toggle)
pypck 0.9.8-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 488 kB
  • sloc: python: 5,041; makefile: 15
file content (45 lines) | stat: -rw-r--r-- 1,577 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
"""Test the data flow for Input objects."""

from unittest.mock import patch

from pypck.inputs import Input, ModInput
from pypck.lcn_addr import LcnAddr

from .conftest import MockPchkConnectionManager


async def test_message_to_input(pypck_client: MockPchkConnectionManager) -> None:
    """Test data flow from message to input."""
    inp = Input()
    message = "dummy_message"
    with patch.object(
        pypck_client, "async_process_input"
    ) as pypck_client_process_input:
        with patch("pypck.inputs.InputParser.parse", return_value=[inp]) as inp_parse:
            await pypck_client.process_message(message)

        inp_parse.assert_called_with(message)
        pypck_client_process_input.assert_awaited_with(inp)


async def test_physical_to_logical_segment_id(
    pypck_client: MockPchkConnectionManager,
) -> None:
    """Test conversion from logical to physical segment id."""
    pypck_client.local_seg_id = 20
    module = pypck_client.get_device_connection(LcnAddr(20, 7, False))
    assert module.is_group is False
    with (
        patch("tests.conftest.MockPchkConnectionManager.is_ready", return_value=True),
        patch.object(module, "async_process_input") as module_process_input,
    ):
        inp = ModInput(LcnAddr(20, 7, False))
        await pypck_client.async_process_input(inp)

        inp = ModInput(LcnAddr(0, 7, False))
        await pypck_client.async_process_input(inp)

        inp = ModInput(LcnAddr(4, 7, False))
        await pypck_client.async_process_input(inp)

        assert module_process_input.await_count == 3