File: server.py

package info (click to toggle)
dasbus 1.7-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 880 kB
  • sloc: python: 7,550; makefile: 101; sh: 4
file content (83 lines) | stat: -rw-r--r-- 2,391 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#
# Run the service org.example.Register.
#
from dasbus.loop import EventLoop
from dasbus.server.interface import dbus_interface
from dasbus.server.property import emits_properties_changed
from dasbus.server.template import InterfaceTemplate
from dasbus.signal import Signal
from dasbus.typing import Structure, List
from dasbus.xml import XMLGenerator
from common import SESSION_BUS, REGISTER, User, InvalidUser


@dbus_interface(REGISTER.interface_name)
class RegisterInterface(InterfaceTemplate):
    """The DBus interface of the user register."""

    def connect_signals(self):
        """Connect the signals."""
        self.watch_property("Users", self.implementation.users_changed)

    @property
    def Users(self) -> List[Structure]:
        """The list of users."""
        return User.to_structure_list(self.implementation.users)

    @emits_properties_changed
    def RegisterUser(self, user: Structure):
        """Register a new user."""
        self.implementation.register_user(User.from_structure(user))


class Register(object):
    """The implementation of the user register."""

    def __init__(self):
        self._users = []
        self._users_changed = Signal()

    @property
    def users(self):
        """The list of users."""
        return self._users

    @property
    def users_changed(self):
        """Signal the user list change."""
        return self._users_changed

    def register_user(self, user: User):
        """Register a new user."""
        if any(u for u in self.users if u.name == user.name):
            raise InvalidUser("User {} exists.".format(user.name))

        self._users.append(user)
        self._users_changed.emit()


if __name__ == "__main__":
    # Print the generated XML specification.
    print(XMLGenerator.prettify_xml(RegisterInterface.__dbus_xml__))

    try:
        # Create the register.
        register = Register()

        # Publish the register at /org/example/Register.
        SESSION_BUS.publish_object(
            REGISTER.object_path,
            RegisterInterface(register)
        )

        # Register the service name org.example.Register.
        SESSION_BUS.register_service(
            REGISTER.service_name
        )

        # Start the event loop.
        loop = EventLoop()
        loop.run()
    finally:
        # Unregister the DBus service and objects.
        SESSION_BUS.disconnect()