File: test_protocol.py

package info (click to toggle)
siobrultech-protocols 0.14.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 308 kB
  • sloc: python: 3,042; sh: 15; makefile: 7
file content (127 lines) | stat: -rw-r--r-- 5,083 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import asyncio
import logging
import sys
import unittest

from siobrultech_protocols.gem.packets import BIN48_NET, Packet
from siobrultech_protocols.gem.protocol import (
    ConnectionLostMessage,
    ConnectionMadeMessage,
    PacketProtocol,
    PacketProtocolMessage,
    PacketReceivedMessage,
)
from tests.gem.mock_transport import MockTransport
from tests.gem.packet_test_data import assert_packet, read_packet, read_packets

logging.basicConfig(
    stream=sys.stderr,
    level=logging.DEBUG,
    format="%(asctime)s [%(name)s](%(levelname)s) %(message)s",
)


class TestPacketAccumulator(unittest.IsolatedAsyncioTestCase):
    def setUp(self):
        self._queue: asyncio.Queue[PacketProtocolMessage] = asyncio.Queue()
        self._transport = MockTransport()
        self._protocol = PacketProtocol(queue=self._queue)
        self._protocol.connection_made(self._transport)
        message = self._queue.get_nowait()
        assert isinstance(message, ConnectionMadeMessage)
        assert message.protocol is self._protocol

    def tearDown(self) -> None:
        if not self._transport.closed:
            exc = Exception("Test")
            self._protocol.connection_lost(exc=exc)
            message = self._queue.get_nowait()
            assert isinstance(message, ConnectionLostMessage)
            assert message.protocol is self._protocol
            assert message.exc is exc
        self._protocol.close()  # Close after connection_lost is not required, but at least should not crash

    def testClose(self):
        self._protocol.close()

        assert self._transport.closed

    def test_single_packet(self):
        packet_data = read_packet("BIN32-ABS.bin")
        self._protocol.data_received(packet_data)
        packet = self.expect_packet_recieved()
        assert_packet("BIN32-ABS.bin", packet)

    def test_header_only(self):
        packet_data = read_packet("BIN32-ABS.bin")
        self._protocol.data_received(packet_data[:2])
        with self.assertRaises(asyncio.queues.QueueEmpty):
            self._queue.get_nowait()
        self._protocol.data_received(packet_data[2:])
        packet = self.expect_packet_recieved()
        assert_packet("BIN32-ABS.bin", packet)

    def test_partial_packet(self):
        packet_data = read_packet("BIN32-ABS.bin")
        self._protocol.data_received(packet_data[:100])
        with self.assertRaises(asyncio.queues.QueueEmpty):
            self._queue.get_nowait()
        self._protocol.data_received(packet_data[100:])
        packet = self.expect_packet_recieved()
        assert_packet("BIN32-ABS.bin", packet)

    def test_time_packet(self):
        packet_data = read_packet("BIN48-NET-TIME_tricky.bin")
        self._protocol.data_received(packet_data)
        packet = self.expect_packet_recieved()
        assert_packet("BIN48-NET-TIME_tricky.bin", packet)

    def test_partial_time_packet(self):
        packet_data = read_packet("BIN48-NET-TIME_tricky.bin")
        self._protocol.data_received(packet_data[: BIN48_NET.size])
        with self.assertRaises(asyncio.queues.QueueEmpty):
            self._queue.get_nowait()
        self._protocol.data_received(packet_data[BIN48_NET.size :])
        packet = self.expect_packet_recieved()
        assert_packet("BIN48-NET-TIME_tricky.bin", packet)

    def test_multiple_packets(self):
        packet_data = read_packets(
            ["BIN32-ABS.bin", "BIN32-NET.bin", "BIN48-NET.bin", "BIN48-NET-TIME.bin"]
        )
        self._protocol.data_received(packet_data)
        packet = self.expect_packet_recieved()
        assert_packet("BIN32-ABS.bin", packet)
        packet = self.expect_packet_recieved()
        assert_packet("BIN32-NET.bin", packet)
        packet = self.expect_packet_recieved()
        assert_packet("BIN48-NET.bin", packet)
        packet = self.expect_packet_recieved()
        assert_packet("BIN48-NET-TIME.bin", packet)

    def test_multiple_packets_with_junk(self):
        self._protocol.data_received(read_packet("BIN32-ABS.bin"))
        self._protocol.data_received(bytes.fromhex("feff05"))
        self._protocol.data_received(read_packet("BIN32-NET.bin"))
        self._protocol.data_received(bytes.fromhex("feff01"))
        self._protocol.data_received(read_packet("BIN48-NET.bin"))
        self._protocol.data_received(bytes.fromhex("23413081afb134870dacea"))
        self._protocol.data_received(read_packet("BIN48-NET-TIME.bin"))
        packet = self.expect_packet_recieved()
        assert_packet("BIN32-ABS.bin", packet)
        packet = self.expect_packet_recieved()
        assert_packet("BIN32-NET.bin", packet)
        packet = self.expect_packet_recieved()
        assert_packet("BIN48-NET.bin", packet)
        packet = self.expect_packet_recieved()
        assert_packet("BIN48-NET-TIME.bin", packet)

    def expect_packet_recieved(self) -> Packet:
        message = self._queue.get_nowait()
        assert isinstance(message, PacketReceivedMessage)
        assert message.protocol is self._protocol
        return message.packet


if __name__ == "__main__":
    unittest.main()