File: test_request_name.py

package info (click to toggle)
dbus-fast 3.1.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,804 kB
  • sloc: python: 9,997; xml: 39; makefile: 29; sh: 5
file content (91 lines) | stat: -rw-r--r-- 2,582 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import asyncio
import sys

import pytest

from dbus_fast import (
    Message,
    MessageType,
    NameFlag,
    ReleaseNameReply,
    RequestNameReply,
    aio,
    glib,
)
from tests.util import check_gi_repository, skip_reason_no_gi

has_gi = check_gi_repository()


@pytest.mark.asyncio
async def test_name_requests():
    test_name = "aio.test.request.name"

    bus1 = await aio.MessageBus().connect()
    bus2 = await aio.MessageBus().connect()

    async def get_name_owner(name):
        reply = await bus1.call(
            Message(
                destination="org.freedesktop.DBus",
                path="/org/freedesktop/DBus",
                interface="org.freedesktop.DBus",
                member="GetNameOwner",
                signature="s",
                body=[name],
            )
        )

        assert reply.message_type == MessageType.METHOD_RETURN
        return reply.body[0]

    reply = await bus1.request_name(test_name)
    assert reply == RequestNameReply.PRIMARY_OWNER
    reply = await bus1.request_name(test_name)
    assert reply == RequestNameReply.ALREADY_OWNER

    reply = await bus2.request_name(test_name, NameFlag.ALLOW_REPLACEMENT)
    assert reply == RequestNameReply.IN_QUEUE

    reply = await bus1.release_name(test_name)
    assert reply == ReleaseNameReply.RELEASED

    reply = await bus1.release_name("name.doesnt.exist")
    assert reply == ReleaseNameReply.NON_EXISTENT

    reply = await bus1.release_name(test_name)
    assert reply == ReleaseNameReply.NOT_OWNER

    new_owner = await get_name_owner(test_name)
    assert new_owner == bus2.unique_name

    reply = await bus1.request_name(test_name, NameFlag.DO_NOT_QUEUE)
    assert reply == RequestNameReply.EXISTS

    reply = await bus1.request_name(
        test_name, NameFlag.DO_NOT_QUEUE | NameFlag.REPLACE_EXISTING
    )
    assert reply == RequestNameReply.PRIMARY_OWNER

    bus1.disconnect()
    bus2.disconnect()
    await asyncio.wait_for(bus1.wait_for_disconnect(), timeout=1)
    await asyncio.wait_for(bus2.wait_for_disconnect(), timeout=1)


@pytest.mark.skipif(
    sys.version_info[:3][1] in (10, 11, 12, 13),
    reason="segfaults on py3.10,py3.11,py3.12,py3.13",
)
@pytest.mark.skipif(not has_gi, reason=skip_reason_no_gi)
def test_request_name_glib():
    test_name = "glib.test.request.name"
    bus = glib.MessageBus().connect_sync()

    reply = bus.request_name_sync(test_name)
    assert reply == RequestNameReply.PRIMARY_OWNER

    reply = bus.release_name_sync(test_name)
    assert reply == ReleaseNameReply.RELEASED

    bus.disconnect()