1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
import asyncio
import pytest
from dbus_fast import Message, MessageFlag, MessageType
from dbus_fast.aio import MessageBus
@pytest.mark.asyncio
async def test_standard_interfaces():
bus = await MessageBus().connect()
msg = Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="ListNames",
serial=bus.next_serial(),
)
reply = await bus.call(msg)
assert reply.message_type == MessageType.METHOD_RETURN
assert reply.reply_serial == msg.serial
assert reply.signature == "as"
assert bus.unique_name in reply.body[0]
msg.interface = "org.freedesktop.DBus.Introspectable"
msg.member = "Introspect"
msg.serial = bus.next_serial()
reply = await bus.call(msg)
assert reply.message_type == MessageType.METHOD_RETURN
assert reply.reply_serial == msg.serial
assert reply.signature == "s"
assert type(reply.body[0]) is str
msg.member = "MemberDoesNotExist"
msg.serial = bus.next_serial()
reply = await bus.call(msg)
assert reply.message_type == MessageType.ERROR
assert reply.reply_serial == msg.serial
assert reply.error_name
assert reply.signature == "s"
assert type(reply.body[0]) is str
bus.disconnect()
await asyncio.wait_for(bus.wait_for_disconnect(), timeout=1)
@pytest.mark.asyncio
async def test_error_handling():
bus = await MessageBus().connect()
msg = Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="InvalidMember",
serial=bus.next_serial(),
)
reply = await bus.call(msg)
assert reply.message_type == MessageType.ERROR
assert reply.reply_serial == msg.serial
assert reply.signature == "s"
bus.disconnect()
await asyncio.wait_for(bus.wait_for_disconnect(), timeout=1)
@pytest.mark.asyncio
async def test_sending_messages_between_buses():
bus1 = await MessageBus().connect()
bus2 = await MessageBus().connect()
msg = Message(
destination=bus1.unique_name,
path="/org/test/path",
interface="org.test.iface",
member="SomeMember",
serial=bus2.next_serial(),
)
def message_handler(sent):
if sent.sender == bus2.unique_name and sent.serial == msg.serial:
assert sent.path == msg.path
assert sent.serial == msg.serial
assert sent.interface == msg.interface
assert sent.member == msg.member
bus1.send(Message.new_method_return(sent, "s", ["got it"]))
bus1.remove_message_handler(message_handler)
return True
bus1.add_message_handler(message_handler)
reply = await bus2.call(msg)
assert reply.message_type == MessageType.METHOD_RETURN
assert reply.sender == bus1.unique_name
assert reply.signature == "s"
assert reply.body == ["got it"]
assert reply.reply_serial == msg.serial
def message_handler_error(sent):
if sent.sender == bus2.unique_name and sent.serial == msg.serial:
assert sent.path == msg.path
assert sent.serial == msg.serial
assert sent.interface == msg.interface
assert sent.member == msg.member
bus1.send(Message.new_error(sent, "org.test.Error", "throwing an error"))
bus1.remove_message_handler(message_handler_error)
return True
bus1.add_message_handler(message_handler_error)
msg.serial = bus2.next_serial()
reply = await bus2.call(msg)
assert reply.message_type == MessageType.ERROR
assert reply.sender == bus1.unique_name
assert reply.reply_serial == msg.serial
assert reply.error_name == "org.test.Error"
assert reply.signature == "s"
assert reply.body == ["throwing an error"]
msg.serial = bus2.next_serial()
msg.flags = MessageFlag.NO_REPLY_EXPECTED
with pytest.warns(DeprecationWarning):
reply = await bus2.call(msg)
assert reply is None
reply = await bus2.send(msg)
assert reply is None
bus1.disconnect()
bus2.disconnect()
await asyncio.wait_for(bus1.wait_for_disconnect(), timeout=1)
await asyncio.wait_for(bus2.wait_for_disconnect(), timeout=1)
@pytest.mark.asyncio
async def test_sending_signals_between_buses():
bus1 = await MessageBus().connect()
bus2 = await MessageBus().connect()
add_match_msg = Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus",
member="AddMatch",
signature="s",
body=[f"sender={bus2.unique_name}"],
)
await bus1.call(add_match_msg)
async def wait_for_message():
future = asyncio.get_running_loop().create_future()
def message_handler(signal):
if signal.sender == bus2.unique_name:
bus1.remove_message_handler(message_handler)
future.set_result(signal)
bus1.add_message_handler(message_handler)
return await future
bus2.send(
Message.new_signal(
"/org/test/path", "org.test.interface", "SomeSignal", "s", ["a signal"]
)
)
signal = await wait_for_message()
assert signal.message_type == MessageType.SIGNAL
assert signal.path == "/org/test/path"
assert signal.interface == "org.test.interface"
assert signal.member == "SomeSignal"
assert signal.signature == "s"
assert signal.body == ["a signal"]
bus1.disconnect()
bus2.disconnect()
await asyncio.wait_for(bus1.wait_for_disconnect(), timeout=1)
await asyncio.wait_for(bus2.wait_for_disconnect(), timeout=1)
|