1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
Examples
=======================
Asyncio client and server
++++++++++++++++++++++++++++++
In this example we create a simple example server and client.
There are 3 files:
* ``example_interface.py`` File that contains the interface definition.
* ``example_server.py`` Server.
* ``example_client.py`` Client.
``example_interface.py`` file: ::
from sdbus import (DbusInterfaceCommonAsync, dbus_method_async,
dbus_property_async, dbus_signal_async)
# This is file only contains interface definition for easy import
# in server and client files
class ExampleInterface(
DbusInterfaceCommonAsync,
interface_name='org.example.interface'
):
@dbus_method_async(
input_signature='s',
result_signature='s',
)
async def upper(self, string: str) -> str:
return string.upper()
@dbus_property_async(
property_signature='s',
)
def hello_world(self) -> str:
return 'Hello, World!'
@dbus_signal_async(
signal_signature='i'
)
def clock(self) -> int:
raise NotImplementedError
``example_server.py`` file: ::
from asyncio import get_event_loop, sleep
from random import randint
from time import time
from example_interface import ExampleInterface
from sdbus import request_default_bus_name_async
loop = get_event_loop()
export_object = ExampleInterface()
async def clock() -> None:
"""
This coroutine will sleep a random time and emit a signal with current clock
"""
while True:
await sleep(randint(2, 7)) # Sleep a random time
current_time = int(time()) # The interface we defined uses integers
export_object.clock.emit(current_time)
async def startup() -> None:
"""Perform async startup actions"""
# Acquire a known name on the bus
# Clients will use that name to address to this server
await request_default_bus_name_async('org.example.test')
# Export the object to D-Bus
export_object.export_to_dbus('/')
loop.run_until_complete(startup())
task_clock = loop.create_task(clock())
loop.run_forever()
``example_client.py`` file: ::
from asyncio import get_event_loop
from example_interface import ExampleInterface
# Create a new proxy object
example_object = ExampleInterface.new_proxy('org.example.test', '/')
async def print_clock() -> None:
# Use async for loop to print clock signals we receive
async for x in example_object.clock:
print('Got clock: ', x)
async def call_upper() -> None:
s = 'test string'
s_after = await example_object.upper(s)
print('Initial string: ', s)
print('After call: ', s_after)
async def get_hello_world() -> None:
print('Remote property: ', await example_object.hello_world)
loop = get_event_loop()
# Always binds your tasks to a variable
task_upper = loop.create_task(call_upper())
task_clock = loop.create_task(print_clock())
task_hello_world = loop.create_task(get_hello_world())
loop.run_forever()
Start server before client. ``python example_server.py``
In separated terminal start client. ``python example_client.py``
Use CTRL-C to close client and server.
You can also use :py:obj:`ExampleInterface` as a local object: ::
from asyncio import run
from example_interface import ExampleInterface
example_object = ExampleInterface()
async def test() -> None:
print(await example_object.upper('test'))
print(await example_object.hello_world)
run(test())
|