File: test_request_name.py

package info (click to toggle)
python-dbus-next 0.2.3-5
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 696 kB
  • sloc: python: 6,018; makefile: 45; xml: 29
file content (69 lines) | stat: -rw-r--r-- 2,228 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from dbus_next import aio, glib, Message, MessageType, NameFlag, RequestNameReply, ReleaseNameReply
from test.util import check_gi_repository, skip_reason_no_gi

import pytest

has_gi = check_gi_repository()


@pytest.mark.asyncio
async def test_name_requests():
    test_name = 'aio.test.request.name'

    bus1 = await aio.MessageBus().connect()
    bus2 = await aio.MessageBus().connect()

    async def get_name_owner(name):
        reply = await bus1.call(
            Message(destination='org.freedesktop.DBus',
                    path='/org/freedesktop/DBus',
                    interface='org.freedesktop.DBus',
                    member='GetNameOwner',
                    signature='s',
                    body=[name]))

        assert reply.message_type == MessageType.METHOD_RETURN
        return reply.body[0]

    reply = await bus1.request_name(test_name)
    assert reply == RequestNameReply.PRIMARY_OWNER
    reply = await bus1.request_name(test_name)
    assert reply == RequestNameReply.ALREADY_OWNER

    reply = await bus2.request_name(test_name, NameFlag.ALLOW_REPLACEMENT)
    assert reply == RequestNameReply.IN_QUEUE

    reply = await bus1.release_name(test_name)
    assert reply == ReleaseNameReply.RELEASED

    reply = await bus1.release_name('name.doesnt.exist')
    assert reply == ReleaseNameReply.NON_EXISTENT

    reply = await bus1.release_name(test_name)
    assert reply == ReleaseNameReply.NOT_OWNER

    new_owner = await get_name_owner(test_name)
    assert new_owner == bus2.unique_name

    reply = await bus1.request_name(test_name, NameFlag.DO_NOT_QUEUE)
    assert reply == RequestNameReply.EXISTS

    reply = await bus1.request_name(test_name, NameFlag.DO_NOT_QUEUE | NameFlag.REPLACE_EXISTING)
    assert reply == RequestNameReply.PRIMARY_OWNER

    bus1.disconnect()
    bus2.disconnect()


@pytest.mark.skipif(not has_gi, reason=skip_reason_no_gi)
def test_request_name_glib():
    test_name = 'glib.test.request.name'
    bus = glib.MessageBus().connect_sync()

    reply = bus.request_name_sync(test_name)
    assert reply == RequestNameReply.PRIMARY_OWNER

    reply = bus.release_name_sync(test_name)
    assert reply == ReleaseNameReply.RELEASED

    bus.disconnect()