1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
|
import re
from time import monotonic
import pytest
from tests.conftest import STEP_MS, TestingNode
from pyartnet.base import BaseUniverse
from pyartnet.base.channel import Channel
from pyartnet.base.network import MulticastNetworkTarget, UnicastNetworkTarget
from pyartnet.errors import DuplicateUniverseError
def test_repr(node: TestingNode) -> None:
re_id = re.compile(r'(name=TestingNode-)[0-f]+')
def _repr(obj: object) -> str:
return re_id.sub(r'\g<1>123456', str(obj))
# Unicast
node = TestingNode(UnicastNetworkTarget(dst=('IP', 9999)))
assert _repr(node) == '<TestingNode name=TestingNode-123456 network=Unicast(dst=IP:9999, source=None) universes=->'
node.add_universe(9)
assert _repr(node) == '<TestingNode name=TestingNode-123456 network=Unicast(dst=IP:9999, source=None) universe=9>'
node.add_universe(2)
assert (_repr(node) ==
'<TestingNode name=TestingNode-123456 network=Unicast(dst=IP:9999, source=None) universes=2,9>')
# Multicast
node = TestingNode(MulticastNetworkTarget(src=('IP', 99999)))
assert _repr(node) == '<TestingNode name=TestingNode-123456 network=Multicast(source=IP) universes=->'
def test_universe_add_get(node: TestingNode) -> None:
u = node.add_universe()
assert len(node) == 1
assert node.get_universe(0) is u
assert node[0] is u
# Duplicate
with pytest.raises(DuplicateUniverseError, match='BaseUniverse 0 does already exist!'):
node.add_universe()
# Check that the nodes are ascending
node.add_universe(50)
node.add_universe(3)
assert len(node) == 3
assert node._universes == (
node.get_universe(0), node.get_universe(3), node.get_universe(50)
)
async def test_fade_await(node: TestingNode, universe: BaseUniverse, caplog) -> None:
async def check_no_wait_time_when_no_fade() -> None:
start = monotonic()
for _ in range(1000):
assert not await node
assert monotonic() - start < 0.001
async def check_wait_time_when_fade(steps: int) -> None:
start = monotonic()
await node
assert monotonic() - start >= ((steps - 1) * STEP_MS) / 1000
caplog.set_level(0)
channel = Channel(universe, 1, 1)
await check_no_wait_time_when_no_fade()
channel.set_fade([2], 2 * STEP_MS)
assert channel.get_values() == [0]
assert list(caplog.messages) == [
'Added fade with 2 steps:',
'CH 1: 000 -> 002 | step: +1.0'
]
assert channel._current_fade is not None
await check_wait_time_when_fade(2)
assert channel._current_fade is None
assert channel.get_values() == [2]
assert node.data == ['01', '02']
await check_no_wait_time_when_no_fade()
channel.set_fade([10], 2 * STEP_MS)
assert channel._current_fade is not None
await check_wait_time_when_fade(2)
assert channel._current_fade is None
assert node.data == ['01', '02', '06', '0a']
await check_no_wait_time_when_no_fade()
await node.wait_for_task_finish()
async def test_context(node: TestingNode) -> None:
assert node._socket is None
async with node:
node._socket.close.assert_not_called()
assert node._refresh_task.task is not None
node._socket.close.assert_called()
|