1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
from dbus_next.aio import MessageBus
from dbus_next import Message, MessageType, MessageFlag
import pytest
@pytest.mark.asyncio
async def test_standard_interfaces():
bus = await MessageBus().connect()
msg = Message(destination='org.freedesktop.DBus',
path='/org/freedesktop/DBus',
interface='org.freedesktop.DBus',
member='ListNames',
serial=bus.next_serial())
reply = await bus.call(msg)
assert reply.message_type == MessageType.METHOD_RETURN
assert reply.reply_serial == msg.serial
assert reply.signature == 'as'
assert bus.unique_name in reply.body[0]
msg.interface = 'org.freedesktop.DBus.Introspectable'
msg.member = 'Introspect'
msg.serial = bus.next_serial()
reply = await bus.call(msg)
assert reply.message_type == MessageType.METHOD_RETURN
assert reply.reply_serial == msg.serial
assert reply.signature == 's'
assert type(reply.body[0]) is str
msg.member = 'MemberDoesNotExist'
msg.serial = bus.next_serial()
reply = await bus.call(msg)
assert reply.message_type == MessageType.ERROR
assert reply.reply_serial == msg.serial
assert reply.error_name
assert reply.signature == 's'
assert type(reply.body[0]) is str
@pytest.mark.asyncio
async def test_sending_messages_between_buses():
bus1 = await MessageBus().connect()
bus2 = await MessageBus().connect()
msg = Message(destination=bus1.unique_name,
path='/org/test/path',
interface='org.test.iface',
member='SomeMember',
serial=bus2.next_serial())
def message_handler(sent):
if sent.sender == bus2.unique_name and sent.serial == msg.serial:
assert sent.path == msg.path
assert sent.serial == msg.serial
assert sent.interface == msg.interface
assert sent.member == msg.member
bus1.send(Message.new_method_return(sent, 's', ['got it']))
bus1.remove_message_handler(message_handler)
return True
bus1.add_message_handler(message_handler)
reply = await bus2.call(msg)
assert reply.message_type == MessageType.METHOD_RETURN
assert reply.sender == bus1.unique_name
assert reply.signature == 's'
assert reply.body == ['got it']
assert reply.reply_serial == msg.serial
def message_handler_error(sent):
if sent.sender == bus2.unique_name and sent.serial == msg.serial:
assert sent.path == msg.path
assert sent.serial == msg.serial
assert sent.interface == msg.interface
assert sent.member == msg.member
bus1.send(Message.new_error(sent, 'org.test.Error', 'throwing an error'))
bus1.remove_message_handler(message_handler_error)
return True
bus1.add_message_handler(message_handler_error)
msg.serial = bus2.next_serial()
reply = await bus2.call(msg)
assert reply.message_type == MessageType.ERROR
assert reply.sender == bus1.unique_name
assert reply.reply_serial == msg.serial
assert reply.error_name == 'org.test.Error'
assert reply.signature == 's'
assert reply.body == ['throwing an error']
msg.serial = bus2.next_serial()
msg.flags = MessageFlag.NO_REPLY_EXPECTED
reply = await bus2.call(msg)
assert reply is None
@pytest.mark.asyncio
async def test_sending_signals_between_buses(event_loop):
bus1 = await MessageBus().connect()
bus2 = await MessageBus().connect()
add_match_msg = Message(destination='org.freedesktop.DBus',
path='/org/freedesktop/DBus',
interface='org.freedesktop.DBus',
member='AddMatch',
signature='s',
body=[f'sender={bus2.unique_name}'])
await bus1.call(add_match_msg)
async def wait_for_message():
future = event_loop.create_future()
def message_handler(signal):
if signal.sender == bus2.unique_name:
bus1.remove_message_handler(message_handler)
future.set_result(signal)
bus1.add_message_handler(message_handler)
return await future
bus2.send(
Message.new_signal('/org/test/path', 'org.test.interface', 'SomeSignal', 's', ['a signal']))
signal = await wait_for_message()
assert signal.message_type == MessageType.SIGNAL
assert signal.path == '/org/test/path'
assert signal.interface == 'org.test.interface'
assert signal.member == 'SomeSignal'
assert signal.signature == 's'
assert signal.body == ['a signal']
|