1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
from datetime import datetime, timedelta, timezone
import pytest
import msgpack
from socketio import msgpack_packet
from socketio import packet
class TestMsgPackPacket:
def test_encode_decode(self):
p = msgpack_packet.MsgPackPacket(
packet.CONNECT, data={'auth': {'token': '123'}}, namespace='/foo')
p2 = msgpack_packet.MsgPackPacket(encoded_packet=p.encode())
assert p.packet_type == p2.packet_type
assert p.data == p2.data
assert p.id == p2.id
assert p.namespace == p2.namespace
def test_encode_decode_with_id(self):
p = msgpack_packet.MsgPackPacket(
packet.EVENT, data=['ev', 42], id=123, namespace='/foo')
p2 = msgpack_packet.MsgPackPacket(encoded_packet=p.encode())
assert p.packet_type == p2.packet_type
assert p.data == p2.data
assert p.id == p2.id
assert p.namespace == p2.namespace
def test_encode_binary_event_packet(self):
p = msgpack_packet.MsgPackPacket(packet.EVENT, data={'foo': b'bar'})
assert p.packet_type == packet.EVENT
p2 = msgpack_packet.MsgPackPacket(encoded_packet=p.encode())
assert p2.data == {'foo': b'bar'}
def test_encode_binary_ack_packet(self):
p = msgpack_packet.MsgPackPacket(packet.ACK, data={'foo': b'bar'})
assert p.packet_type == packet.ACK
p2 = msgpack_packet.MsgPackPacket(encoded_packet=p.encode())
assert p2.data == {'foo': b'bar'}
def test_encode_with_dumps_default(self):
def default(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError('Unknown type')
data = {
'current': datetime.now(tz=timezone(timedelta(0))),
'key': 'value',
}
p = msgpack_packet.MsgPackPacket.configure(dumps_default=default)(
data=data)
p2 = msgpack_packet.MsgPackPacket(encoded_packet=p.encode())
assert p.packet_type == p2.packet_type
assert p.id == p2.id
assert p.namespace == p2.namespace
assert p.data != p2.data
assert isinstance(p2.data, dict)
assert 'current' in p2.data
assert isinstance(p2.data['current'], str)
assert default(data['current']) == p2.data['current']
data.pop('current')
p2_data_without_current = p2.data.copy()
p2_data_without_current.pop('current')
assert data == p2_data_without_current
def test_encode_without_dumps_default(self):
data = {
'current': datetime.now(tz=timezone(timedelta(0))),
'key': 'value',
}
p_without_default = msgpack_packet.MsgPackPacket(data=data)
with pytest.raises(TypeError):
p_without_default.encode()
def test_encode_decode_with_ext_hook(self):
class Custom:
def __init__(self, value):
self.value = value
def __eq__(self, value: object) -> bool:
return isinstance(value, Custom) and self.value == value.value
def default(obj):
if isinstance(obj, Custom):
return msgpack.ExtType(1, obj.value)
raise TypeError('Unknown type')
def ext_hook(code, data):
if code == 1:
return Custom(data)
raise TypeError('Unknown ext type')
data = {'custom': Custom(b'custom_data'), 'key': 'value'}
p = msgpack_packet.MsgPackPacket.configure(dumps_default=default)(
data=data)
p2 = msgpack_packet.MsgPackPacket.configure(ext_hook=ext_hook)(
encoded_packet=p.encode()
)
assert p.packet_type == p2.packet_type
assert p.id == p2.id
assert p.data == p2.data
assert p.namespace == p2.namespace
def test_encode_decode_without_ext_hook(self):
class Custom:
def __init__(self, value):
self.value = value
def __eq__(self, value: object) -> bool:
return isinstance(value, Custom) and self.value == value.value
def default(obj):
if isinstance(obj, Custom):
return msgpack.ExtType(1, obj.value)
raise TypeError('Unknown type')
data = {'custom': Custom(b'custom_data'), 'key': 'value'}
p = msgpack_packet.MsgPackPacket.configure(dumps_default=default)(
data=data)
p2 = msgpack_packet.MsgPackPacket(encoded_packet=p.encode())
assert p.packet_type == p2.packet_type
assert p.id == p2.id
assert p.namespace == p2.namespace
assert p.data != p2.data
assert isinstance(p2.data, dict)
assert 'custom' in p2.data
assert isinstance(p2.data['custom'], msgpack.ExtType)
assert p2.data['custom'].code == 1
assert p2.data['custom'].data == b'custom_data'
data.pop('custom')
p2_data_without_custom = p2.data.copy()
p2_data_without_custom.pop('custom')
assert data == p2_data_without_custom
|