File: test_aio_low_level.py

package info (click to toggle)
python-dbus-next 0.2.3-6
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 704 kB
  • sloc: python: 6,018; makefile: 45; xml: 29
file content (138 lines) | stat: -rw-r--r-- 4,712 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from dbus_next.aio import MessageBus
from dbus_next import Message, MessageType, MessageFlag

import pytest


@pytest.mark.asyncio
async def test_standard_interfaces():
    bus = await MessageBus().connect()
    msg = Message(destination='org.freedesktop.DBus',
                  path='/org/freedesktop/DBus',
                  interface='org.freedesktop.DBus',
                  member='ListNames',
                  serial=bus.next_serial())
    reply = await bus.call(msg)

    assert reply.message_type == MessageType.METHOD_RETURN
    assert reply.reply_serial == msg.serial
    assert reply.signature == 'as'
    assert bus.unique_name in reply.body[0]

    msg.interface = 'org.freedesktop.DBus.Introspectable'
    msg.member = 'Introspect'
    msg.serial = bus.next_serial()

    reply = await bus.call(msg)
    assert reply.message_type == MessageType.METHOD_RETURN
    assert reply.reply_serial == msg.serial
    assert reply.signature == 's'
    assert type(reply.body[0]) is str

    msg.member = 'MemberDoesNotExist'
    msg.serial = bus.next_serial()

    reply = await bus.call(msg)
    assert reply.message_type == MessageType.ERROR
    assert reply.reply_serial == msg.serial
    assert reply.error_name
    assert reply.signature == 's'
    assert type(reply.body[0]) is str


@pytest.mark.asyncio
async def test_sending_messages_between_buses():
    bus1 = await MessageBus().connect()
    bus2 = await MessageBus().connect()

    msg = Message(destination=bus1.unique_name,
                  path='/org/test/path',
                  interface='org.test.iface',
                  member='SomeMember',
                  serial=bus2.next_serial())

    def message_handler(sent):
        if sent.sender == bus2.unique_name and sent.serial == msg.serial:
            assert sent.path == msg.path
            assert sent.serial == msg.serial
            assert sent.interface == msg.interface
            assert sent.member == msg.member
            bus1.send(Message.new_method_return(sent, 's', ['got it']))
            bus1.remove_message_handler(message_handler)
            return True

    bus1.add_message_handler(message_handler)

    reply = await bus2.call(msg)

    assert reply.message_type == MessageType.METHOD_RETURN
    assert reply.sender == bus1.unique_name
    assert reply.signature == 's'
    assert reply.body == ['got it']
    assert reply.reply_serial == msg.serial

    def message_handler_error(sent):
        if sent.sender == bus2.unique_name and sent.serial == msg.serial:
            assert sent.path == msg.path
            assert sent.serial == msg.serial
            assert sent.interface == msg.interface
            assert sent.member == msg.member
            bus1.send(Message.new_error(sent, 'org.test.Error', 'throwing an error'))
            bus1.remove_message_handler(message_handler_error)
            return True

    bus1.add_message_handler(message_handler_error)

    msg.serial = bus2.next_serial()

    reply = await bus2.call(msg)

    assert reply.message_type == MessageType.ERROR
    assert reply.sender == bus1.unique_name
    assert reply.reply_serial == msg.serial
    assert reply.error_name == 'org.test.Error'
    assert reply.signature == 's'
    assert reply.body == ['throwing an error']

    msg.serial = bus2.next_serial()
    msg.flags = MessageFlag.NO_REPLY_EXPECTED
    reply = await bus2.call(msg)
    assert reply is None


@pytest.mark.asyncio
async def test_sending_signals_between_buses(event_loop):
    bus1 = await MessageBus().connect()
    bus2 = await MessageBus().connect()

    add_match_msg = Message(destination='org.freedesktop.DBus',
                            path='/org/freedesktop/DBus',
                            interface='org.freedesktop.DBus',
                            member='AddMatch',
                            signature='s',
                            body=[f'sender={bus2.unique_name}'])

    await bus1.call(add_match_msg)

    async def wait_for_message():
        future = event_loop.create_future()

        def message_handler(signal):
            if signal.sender == bus2.unique_name:
                bus1.remove_message_handler(message_handler)
                future.set_result(signal)

        bus1.add_message_handler(message_handler)
        return await future

    bus2.send(
        Message.new_signal('/org/test/path', 'org.test.interface', 'SomeSignal', 's', ['a signal']))

    signal = await wait_for_message()

    assert signal.message_type == MessageType.SIGNAL
    assert signal.path == '/org/test/path'
    assert signal.interface == 'org.test.interface'
    assert signal.member == 'SomeSignal'
    assert signal.signature == 's'
    assert signal.body == ['a signal']