File: test_aio.py

package info (click to toggle)
dbus-fast 3.1.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,804 kB
  • sloc: python: 9,997; xml: 39; makefile: 29; sh: 5
file content (28 lines) | stat: -rw-r--r-- 834 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio

import pytest

from dbus_fast import aio
from dbus_fast.service import ServiceInterface


class ExampleInterface(ServiceInterface):
    def __init__(self):
        super().__init__("test.interface")


@pytest.mark.asyncio
async def test_fast_disconnect():
    bus_name = "aio.client.test.methods"

    bus = await aio.MessageBus().connect()
    bus2 = await aio.MessageBus().connect()
    await bus.request_name(bus_name)
    service_interface = ExampleInterface()
    bus.export("/test/path", service_interface)
    introspection = await bus2.introspect(bus_name, "/test/path")
    bus2.get_proxy_object(bus_name, "/test/path", introspection)
    bus2.disconnect()
    bus.disconnect()
    await asyncio.wait_for(bus.wait_for_disconnect(), timeout=1)
    await asyncio.wait_for(bus2.wait_for_disconnect(), timeout=1)