1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
#!/usr/bin/env python3
import os
import sys
sys.path.append(os.path.abspath(os.path.dirname(__file__) + "/.."))
import asyncio
from typing import Annotated
from dbus_fast import Variant
from dbus_fast.aio.message_bus import MessageBus
from dbus_fast.annotations import DBusDict, DBusSignature, DBusStr
from dbus_fast.service import ServiceInterface, dbus_method, dbus_property, dbus_signal
class ExampleInterface(ServiceInterface):
def __init__(self, name: str) -> None:
super().__init__(name)
self._string_prop = "kevin"
@dbus_method()
def Echo(self, what: DBusStr) -> DBusStr:
return what
@dbus_method()
def EchoMultiple(
self, what1: DBusStr, what2: DBusStr
) -> Annotated[tuple[str, str], DBusSignature("ss")]:
return what1, what2
@dbus_method()
def GetVariantDict(self) -> DBusDict:
return {
"foo": Variant("s", "bar"),
"bat": Variant("x", -55),
"a_list": Variant("as", ["hello", "world"]),
}
@dbus_property(name="StringProp")
def string_prop(self) -> DBusStr:
return self._string_prop
@string_prop.setter
def string_prop_setter(self, val: DBusStr) -> None:
self._string_prop = val
@dbus_signal()
def signal_simple(self) -> DBusStr:
return "hello"
@dbus_signal()
def signal_multiple(
self,
) -> Annotated[tuple[str, str], DBusSignature("ss")]:
return "hello", "world"
async def main() -> None:
name = "dbus.next.example.service"
path = "/example/path"
interface_name = "example.interface"
bus = await MessageBus().connect()
interface = ExampleInterface(interface_name)
bus.export("/example/path", interface)
await bus.request_name(name)
print(
f'service up on name: "{name}", path: "{path}", interface: "{interface_name}"'
)
await bus.wait_for_disconnect()
asyncio.run(main())
|