File: test_base_node.py

package info (click to toggle)
pyartnet 2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 668 kB
  • sloc: python: 1,845; makefile: 5
file content (109 lines) | stat: -rw-r--r-- 3,292 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import re
from time import monotonic

import pytest
from tests.conftest import STEP_MS, TestingNode

from pyartnet.base import BaseUniverse
from pyartnet.base.channel import Channel
from pyartnet.base.network import MulticastNetworkTarget, UnicastNetworkTarget
from pyartnet.errors import DuplicateUniverseError


def test_repr(node: TestingNode) -> None:

    re_id = re.compile(r'(name=TestingNode-)[0-f]+')

    def _repr(obj: object) -> str:
        return re_id.sub(r'\g<1>123456', str(obj))

    # Unicast
    node = TestingNode(UnicastNetworkTarget(dst=('IP', 9999)))
    assert _repr(node) == '<TestingNode name=TestingNode-123456 network=Unicast(dst=IP:9999, source=None) universes=->'

    node.add_universe(9)
    assert _repr(node) == '<TestingNode name=TestingNode-123456 network=Unicast(dst=IP:9999, source=None) universe=9>'

    node.add_universe(2)
    assert (_repr(node) ==
            '<TestingNode name=TestingNode-123456 network=Unicast(dst=IP:9999, source=None) universes=2,9>')

    # Multicast
    node = TestingNode(MulticastNetworkTarget(src=('IP', 99999)))
    assert _repr(node) == '<TestingNode name=TestingNode-123456 network=Multicast(source=IP) universes=->'


def test_universe_add_get(node: TestingNode) -> None:

    u = node.add_universe()
    assert len(node) == 1
    assert node.get_universe(0) is u
    assert node[0] is u

    # Duplicate
    with pytest.raises(DuplicateUniverseError, match='BaseUniverse 0 does already exist!'):
        node.add_universe()

    # Check that the nodes are ascending
    node.add_universe(50)
    node.add_universe(3)

    assert len(node) == 3
    assert node._universes == (
        node.get_universe(0), node.get_universe(3), node.get_universe(50)
    )


async def test_fade_await(node: TestingNode, universe: BaseUniverse, caplog) -> None:
    async def check_no_wait_time_when_no_fade() -> None:
        start = monotonic()
        for _ in range(1000):
            assert not await node
        assert monotonic() - start < 0.001

    async def check_wait_time_when_fade(steps: int) -> None:
        start = monotonic()
        await node
        assert monotonic() - start >= ((steps - 1) * STEP_MS) / 1000

    caplog.set_level(0)

    channel = Channel(universe, 1, 1)

    await check_no_wait_time_when_no_fade()

    channel.set_fade([2], 2 * STEP_MS)
    assert channel.get_values() == [0]

    assert list(caplog.messages) == [
        'Added fade with 2 steps:',
        'CH 1: 000 -> 002 | step:  +1.0'
    ]

    assert channel._current_fade is not None
    await check_wait_time_when_fade(2)
    assert channel._current_fade is None
    assert channel.get_values() == [2]
    assert node.data == ['01', '02']

    await check_no_wait_time_when_no_fade()

    channel.set_fade([10], 2 * STEP_MS)

    assert channel._current_fade is not None
    await check_wait_time_when_fade(2)
    assert channel._current_fade is None
    assert node.data == ['01', '02', '06', '0a']

    await check_no_wait_time_when_no_fade()
    await node.wait_for_task_finish()


async def test_context(node: TestingNode) -> None:
    assert node._socket is None

    async with node:
        node._socket.close.assert_not_called()
        assert node._refresh_task.task is not None

    node._socket.close.assert_called()