File: test_disconnect.py

package info (click to toggle)
python-dbus-next 0.2.3-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 696 kB
  • sloc: python: 6,018; makefile: 45; xml: 29
file content (56 lines) | stat: -rw-r--r-- 1,492 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from dbus_next.aio import MessageBus
from dbus_next import Message

import os
import pytest
import functools


@pytest.mark.asyncio
async def test_bus_disconnect_before_reply(event_loop):
    '''In this test, the bus disconnects before the reply comes in. Make sure
    the caller receives a reply with the error instead of hanging.'''
    bus = MessageBus()
    assert not bus.connected
    await bus.connect()
    assert bus.connected

    ping = bus.call(
        Message(destination='org.freedesktop.DBus',
                path='/org/freedesktop/DBus',
                interface='org.freedesktop.DBus',
                member='Ping'))

    event_loop.call_soon(bus.disconnect)

    with pytest.raises((EOFError, BrokenPipeError)):
        await ping

    assert bus._disconnected
    assert not bus.connected
    assert (await bus.wait_for_disconnect()) is None


@pytest.mark.asyncio
async def test_unexpected_disconnect(event_loop):
    bus = MessageBus()
    assert not bus.connected
    await bus.connect()
    assert bus.connected

    ping = bus.call(
        Message(destination='org.freedesktop.DBus',
                path='/org/freedesktop/DBus',
                interface='org.freedesktop.DBus',
                member='Ping'))

    event_loop.call_soon(functools.partial(os.close, bus._fd))

    with pytest.raises(OSError):
        await ping

    assert bus._disconnected
    assert not bus.connected

    with pytest.raises(OSError):
        await bus.wait_for_disconnect()