File: test_tcp_address.py

package info (click to toggle)
python-dbus-next 0.2.3-5
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 696 kB
  • sloc: python: 6,018; makefile: 45; xml: 29
file content (67 lines) | stat: -rw-r--r-- 2,158 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from dbus_next.aio import MessageBus
from dbus_next import Message
from dbus_next._private.address import parse_address

import asyncio
import pytest
import os


@pytest.mark.asyncio
async def test_tcp_connection_with_forwarding(event_loop):
    closables = []
    host = '127.0.0.1'
    port = '55556'

    addr_info = parse_address(os.environ.get('DBUS_SESSION_BUS_ADDRESS'))
    assert addr_info
    assert 'abstract' in addr_info[0][1]
    path = f'\0{addr_info[0][1]["abstract"]}'

    async def handle_connection(tcp_reader, tcp_writer):
        unix_reader, unix_writer = await asyncio.open_unix_connection(path)
        closables.append(tcp_writer)
        closables.append(unix_writer)

        async def handle_read():
            while True:
                data = await tcp_reader.read(1)
                if not data:
                    break
                unix_writer.write(data)

        async def handle_write():
            while True:
                data = await unix_reader.read(1)
                if not data:
                    break
                tcp_writer.write(data)

        asyncio.run_coroutine_threadsafe(handle_read(), event_loop)
        asyncio.run_coroutine_threadsafe(handle_write(), event_loop)

    server = await asyncio.start_server(handle_connection, host, port)
    closables.append(server)

    bus = await MessageBus(bus_address=f'tcp:host={host},port={port}').connect()

    # basic tests to see if it works
    result = await bus.call(
        Message(destination='org.freedesktop.DBus',
                path='/org/freedesktop/DBus',
                interface='org.freedesktop.DBus.Peer',
                member='Ping'))
    assert result

    intr = await bus.introspect('org.freedesktop.DBus', '/org/freedesktop/DBus')
    obj = bus.get_proxy_object('org.freedesktop.DBus', '/org/freedesktop/DBus', intr)
    iface = obj.get_interface('org.freedesktop.DBus.Peer')
    await iface.call_ping()

    assert bus._sock.getpeername()[0] == host
    assert bus._sock.getsockname()[0] == host
    assert bus._sock.gettimeout() == 0
    assert bus._stream.closed is False

    for c in closables:
        c.close()