1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
from dbus_next._private.unmarshaller import Unmarshaller
from dbus_next import Message, Variant, SignatureTree
import json
import os
import io
def print_buf(buf):
i = 0
while True:
p = buf[i:i + 8]
if not p:
break
print(p)
i += 8
# these messages have been verified with another library
table = json.load(open(os.path.dirname(__file__) + '/data/messages.json'))
# variants are an object in the json
def replace_variants(type_, item):
if type_.token == 'v' and type(item) is not Variant:
item = Variant(item['signature'],
replace_variants(SignatureTree(item['signature']).types[0], item['value']))
elif type_.token == 'a':
for i, item_child in enumerate(item):
if type_.children[0].token == '{':
for k, v in item.items():
item[k] = replace_variants(type_.children[0].children[1], v)
else:
item[i] = replace_variants(type_.children[0], item_child)
elif type_.token == '(':
for i, item_child in enumerate(item):
if type_.children[0].token == '{':
assert False
else:
item[i] = replace_variants(type_.children[i], item_child)
return item
def json_dump(what):
def dumper(obj):
try:
return obj.toJSON()
except Exception:
return obj.__dict__
return json.dumps(what, default=dumper, indent=2)
def test_marshalling_with_table():
for item in table:
message = Message(**item['message'])
body = []
for i, type_ in enumerate(message.signature_tree.types):
body.append(replace_variants(type_, message.body[i]))
message.body = body
buf = message._marshall()
data = bytes.fromhex(item['data'])
if buf != data:
print('message:')
print(json_dump(item['message']))
print('')
print('mine:')
print_buf(bytes(buf))
print('')
print('theirs:')
print_buf(data)
assert buf == data
def test_unmarshalling_with_table():
for item in table:
stream = io.BytesIO(bytes.fromhex(item['data']))
unmarshaller = Unmarshaller(stream)
try:
unmarshaller.unmarshall()
except Exception as e:
print('message failed to unmarshall:')
print(json_dump(item['message']))
raise e
message = Message(**item['message'])
body = []
for i, type_ in enumerate(message.signature_tree.types):
body.append(replace_variants(type_, message.body[i]))
message.body = body
for attr in [
'body', 'signature', 'message_type', 'destination', 'path', 'interface', 'member',
'flags', 'serial'
]:
assert getattr(unmarshaller.message,
attr) == getattr(message, attr), f'attr doesnt match: {attr}'
def test_ay_buffer():
body = [bytes(10000)]
msg = Message(path='/test', member='test', signature='ay', body=body)
marshalled = msg._marshall()
unmarshalled_msg = Unmarshaller(io.BytesIO(marshalled)).unmarshall()
assert unmarshalled_msg.body[0] == body[0]
|