1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
#!/usr/bin/env python3
import sys
import os
sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/..'))
from dbus_next.service import ServiceInterface, method, signal, dbus_property
from dbus_next.aio.message_bus import MessageBus
from dbus_next import Variant
import asyncio
class ExampleInterface(ServiceInterface):
def __init__(self, name):
super().__init__(name)
self._string_prop = 'kevin'
@method()
def Echo(self, what: 's') -> 's':
return what
@method()
def EchoMultiple(self, what1: 's', what2: 's') -> 'ss':
return [what1, what2]
@method()
def GetVariantDict(self) -> 'a{sv}':
return {
'foo': Variant('s', 'bar'),
'bat': Variant('x', -55),
'a_list': Variant('as', ['hello', 'world'])
}
@dbus_property(name='StringProp')
def string_prop(self) -> 's':
return self._string_prop
@string_prop.setter
def string_prop_setter(self, val: 's'):
self._string_prop = val
@signal()
def signal_simple(self) -> 's':
return 'hello'
@signal()
def signal_multiple(self) -> 'ss':
return ['hello', 'world']
async def main():
name = 'dbus.next.example.service'
path = '/example/path'
interface_name = 'example.interface'
bus = await MessageBus().connect()
interface = ExampleInterface(interface_name)
bus.export('/example/path', interface)
await bus.request_name(name)
print(f'service up on name: "{name}", path: "{path}", interface: "{interface_name}"')
await bus.wait_for_disconnect()
asyncio.get_event_loop().run_until_complete(main())
|