1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
import asyncio
import logging
import sys
import unittest
from siobrultech_protocols.gem.packets import BIN48_NET, Packet
from siobrultech_protocols.gem.protocol import (
ConnectionLostMessage,
ConnectionMadeMessage,
PacketProtocol,
PacketProtocolMessage,
PacketReceivedMessage,
)
from tests.gem.mock_transport import MockTransport
from tests.gem.packet_test_data import assert_packet, read_packet, read_packets
logging.basicConfig(
stream=sys.stderr,
level=logging.DEBUG,
format="%(asctime)s [%(name)s](%(levelname)s) %(message)s",
)
class TestPacketAccumulator(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self._queue: asyncio.Queue[PacketProtocolMessage] = asyncio.Queue()
self._transport = MockTransport()
self._protocol = PacketProtocol(queue=self._queue)
self._protocol.connection_made(self._transport)
message = self._queue.get_nowait()
assert isinstance(message, ConnectionMadeMessage)
assert message.protocol is self._protocol
def tearDown(self) -> None:
if not self._transport.closed:
exc = Exception("Test")
self._protocol.connection_lost(exc=exc)
message = self._queue.get_nowait()
assert isinstance(message, ConnectionLostMessage)
assert message.protocol is self._protocol
assert message.exc is exc
self._protocol.close() # Close after connection_lost is not required, but at least should not crash
def testClose(self):
self._protocol.close()
assert self._transport.closed
def test_single_packet(self):
packet_data = read_packet("BIN32-ABS.bin")
self._protocol.data_received(packet_data)
packet = self.expect_packet_recieved()
assert_packet("BIN32-ABS.bin", packet)
def test_header_only(self):
packet_data = read_packet("BIN32-ABS.bin")
self._protocol.data_received(packet_data[:2])
with self.assertRaises(asyncio.queues.QueueEmpty):
self._queue.get_nowait()
self._protocol.data_received(packet_data[2:])
packet = self.expect_packet_recieved()
assert_packet("BIN32-ABS.bin", packet)
def test_partial_packet(self):
packet_data = read_packet("BIN32-ABS.bin")
self._protocol.data_received(packet_data[:100])
with self.assertRaises(asyncio.queues.QueueEmpty):
self._queue.get_nowait()
self._protocol.data_received(packet_data[100:])
packet = self.expect_packet_recieved()
assert_packet("BIN32-ABS.bin", packet)
def test_time_packet(self):
packet_data = read_packet("BIN48-NET-TIME_tricky.bin")
self._protocol.data_received(packet_data)
packet = self.expect_packet_recieved()
assert_packet("BIN48-NET-TIME_tricky.bin", packet)
def test_partial_time_packet(self):
packet_data = read_packet("BIN48-NET-TIME_tricky.bin")
self._protocol.data_received(packet_data[: BIN48_NET.size])
with self.assertRaises(asyncio.queues.QueueEmpty):
self._queue.get_nowait()
self._protocol.data_received(packet_data[BIN48_NET.size :])
packet = self.expect_packet_recieved()
assert_packet("BIN48-NET-TIME_tricky.bin", packet)
def test_multiple_packets(self):
packet_data = read_packets(
["BIN32-ABS.bin", "BIN32-NET.bin", "BIN48-NET.bin", "BIN48-NET-TIME.bin"]
)
self._protocol.data_received(packet_data)
packet = self.expect_packet_recieved()
assert_packet("BIN32-ABS.bin", packet)
packet = self.expect_packet_recieved()
assert_packet("BIN32-NET.bin", packet)
packet = self.expect_packet_recieved()
assert_packet("BIN48-NET.bin", packet)
packet = self.expect_packet_recieved()
assert_packet("BIN48-NET-TIME.bin", packet)
def test_multiple_packets_with_junk(self):
self._protocol.data_received(read_packet("BIN32-ABS.bin"))
self._protocol.data_received(bytes.fromhex("feff05"))
self._protocol.data_received(read_packet("BIN32-NET.bin"))
self._protocol.data_received(bytes.fromhex("feff01"))
self._protocol.data_received(read_packet("BIN48-NET.bin"))
self._protocol.data_received(bytes.fromhex("23413081afb134870dacea"))
self._protocol.data_received(read_packet("BIN48-NET-TIME.bin"))
packet = self.expect_packet_recieved()
assert_packet("BIN32-ABS.bin", packet)
packet = self.expect_packet_recieved()
assert_packet("BIN32-NET.bin", packet)
packet = self.expect_packet_recieved()
assert_packet("BIN48-NET.bin", packet)
packet = self.expect_packet_recieved()
assert_packet("BIN48-NET-TIME.bin", packet)
def expect_packet_recieved(self) -> Packet:
message = self._queue.get_nowait()
assert isinstance(message, PacketReceivedMessage)
assert message.protocol is self._protocol
return message.packet
if __name__ == "__main__":
unittest.main()
|