File: test_aio_low_level.py

package info (click to toggle)
dbus-fast 4.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,816 kB
  • sloc: python: 10,129; xml: 39; makefile: 29; sh: 5
file content (187 lines) | stat: -rw-r--r-- 5,737 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import asyncio

import pytest

from dbus_fast import Message, MessageFlag, MessageType
from dbus_fast.aio import MessageBus


@pytest.mark.asyncio
async def test_standard_interfaces():
    bus = await MessageBus().connect()
    msg = Message(
        destination="org.freedesktop.DBus",
        path="/org/freedesktop/DBus",
        interface="org.freedesktop.DBus",
        member="ListNames",
        serial=bus.next_serial(),
    )
    reply = await bus.call(msg)

    assert reply.message_type == MessageType.METHOD_RETURN
    assert reply.reply_serial == msg.serial
    assert reply.signature == "as"
    assert bus.unique_name in reply.body[0]

    msg.interface = "org.freedesktop.DBus.Introspectable"
    msg.member = "Introspect"
    msg.serial = bus.next_serial()

    reply = await bus.call(msg)
    assert reply.message_type == MessageType.METHOD_RETURN
    assert reply.reply_serial == msg.serial
    assert reply.signature == "s"
    assert type(reply.body[0]) is str

    msg.member = "MemberDoesNotExist"
    msg.serial = bus.next_serial()

    reply = await bus.call(msg)
    assert reply.message_type == MessageType.ERROR
    assert reply.reply_serial == msg.serial
    assert reply.error_name
    assert reply.signature == "s"
    assert type(reply.body[0]) is str

    bus.disconnect()
    await asyncio.wait_for(bus.wait_for_disconnect(), timeout=1)


@pytest.mark.asyncio
async def test_error_handling():
    bus = await MessageBus().connect()
    msg = Message(
        destination="org.freedesktop.DBus",
        path="/org/freedesktop/DBus",
        interface="org.freedesktop.DBus",
        member="InvalidMember",
        serial=bus.next_serial(),
    )
    reply = await bus.call(msg)

    assert reply.message_type == MessageType.ERROR
    assert reply.reply_serial == msg.serial
    assert reply.signature == "s"

    bus.disconnect()
    await asyncio.wait_for(bus.wait_for_disconnect(), timeout=1)


@pytest.mark.asyncio
async def test_sending_messages_between_buses():
    bus1 = await MessageBus().connect()
    bus2 = await MessageBus().connect()

    msg = Message(
        destination=bus1.unique_name,
        path="/org/test/path",
        interface="org.test.iface",
        member="SomeMember",
        serial=bus2.next_serial(),
    )

    def message_handler(sent):
        if sent.sender == bus2.unique_name and sent.serial == msg.serial:
            assert sent.path == msg.path
            assert sent.serial == msg.serial
            assert sent.interface == msg.interface
            assert sent.member == msg.member
            bus1.send(Message.new_method_return(sent, "s", ["got it"]))
            bus1.remove_message_handler(message_handler)
            return True

    bus1.add_message_handler(message_handler)

    reply = await bus2.call(msg)

    assert reply.message_type == MessageType.METHOD_RETURN
    assert reply.sender == bus1.unique_name
    assert reply.signature == "s"
    assert reply.body == ["got it"]
    assert reply.reply_serial == msg.serial

    def message_handler_error(sent):
        if sent.sender == bus2.unique_name and sent.serial == msg.serial:
            assert sent.path == msg.path
            assert sent.serial == msg.serial
            assert sent.interface == msg.interface
            assert sent.member == msg.member
            bus1.send(Message.new_error(sent, "org.test.Error", "throwing an error"))
            bus1.remove_message_handler(message_handler_error)
            return True

    bus1.add_message_handler(message_handler_error)

    msg.serial = bus2.next_serial()

    reply = await bus2.call(msg)

    assert reply.message_type == MessageType.ERROR
    assert reply.sender == bus1.unique_name
    assert reply.reply_serial == msg.serial
    assert reply.error_name == "org.test.Error"
    assert reply.signature == "s"
    assert reply.body == ["throwing an error"]

    msg.serial = bus2.next_serial()
    msg.flags = MessageFlag.NO_REPLY_EXPECTED

    with pytest.warns(DeprecationWarning):
        reply = await bus2.call(msg)
        assert reply is None

    reply = await bus2.send(msg)
    assert reply is None

    bus1.disconnect()
    bus2.disconnect()
    await asyncio.wait_for(bus1.wait_for_disconnect(), timeout=1)
    await asyncio.wait_for(bus2.wait_for_disconnect(), timeout=1)


@pytest.mark.asyncio
async def test_sending_signals_between_buses():
    bus1 = await MessageBus().connect()
    bus2 = await MessageBus().connect()

    add_match_msg = Message(
        destination="org.freedesktop.DBus",
        path="/org/freedesktop/DBus",
        interface="org.freedesktop.DBus",
        member="AddMatch",
        signature="s",
        body=[f"sender={bus2.unique_name}"],
    )

    await bus1.call(add_match_msg)

    async def wait_for_message():
        future = asyncio.get_running_loop().create_future()

        def message_handler(signal):
            if signal.sender == bus2.unique_name:
                bus1.remove_message_handler(message_handler)
                future.set_result(signal)

        bus1.add_message_handler(message_handler)
        return await future

    bus2.send(
        Message.new_signal(
            "/org/test/path", "org.test.interface", "SomeSignal", "s", ["a signal"]
        )
    )

    signal = await wait_for_message()

    assert signal.message_type == MessageType.SIGNAL
    assert signal.path == "/org/test/path"
    assert signal.interface == "org.test.interface"
    assert signal.member == "SomeSignal"
    assert signal.signature == "s"
    assert signal.body == ["a signal"]

    bus1.disconnect()
    bus2.disconnect()
    await asyncio.wait_for(bus1.wait_for_disconnect(), timeout=1)
    await asyncio.wait_for(bus2.wait_for_disconnect(), timeout=1)