File: example-service.py

package info (click to toggle)
dbus-fast 3.1.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,804 kB
  • sloc: python: 9,997; xml: 39; makefile: 29; sh: 5
file content (67 lines) | stat: -rwxr-xr-x 1,696 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/env python3
import os
import sys

sys.path.append(os.path.abspath(os.path.dirname(__file__) + "/.."))

import asyncio

from dbus_fast import Variant
from dbus_fast.aio.message_bus import MessageBus
from dbus_fast.service import ServiceInterface, dbus_method, dbus_property, dbus_signal


class ExampleInterface(ServiceInterface):
    def __init__(self, name):
        super().__init__(name)
        self._string_prop = "kevin"

    @dbus_method()
    def Echo(self, what: "s") -> "s":
        return what

    @dbus_method()
    def EchoMultiple(self, what1: "s", what2: "s") -> "ss":
        return [what1, what2]

    @dbus_method()
    def GetVariantDict(self) -> "a{sv}":  # noqa: F722
        return {
            "foo": Variant("s", "bar"),
            "bat": Variant("x", -55),
            "a_list": Variant("as", ["hello", "world"]),
        }

    @dbus_property(name="StringProp")
    def string_prop(self) -> "s":
        return self._string_prop

    @string_prop.setter
    def string_prop_setter(self, val: "s"):
        self._string_prop = val

    @dbus_signal()
    def signal_simple(self) -> "s":
        return "hello"

    @dbus_signal()
    def signal_multiple(self) -> "ss":
        return ["hello", "world"]


async def main():
    name = "dbus.next.example.service"
    path = "/example/path"
    interface_name = "example.interface"

    bus = await MessageBus().connect()
    interface = ExampleInterface(interface_name)
    bus.export("/example/path", interface)
    await bus.request_name(name)
    print(
        f'service up on name: "{name}", path: "{path}", interface: "{interface_name}"'
    )
    await bus.wait_for_disconnect()


asyncio.run(main())