File: util_dbus.py

package info (click to toggle)
displaycal-py3 3.9.16-1
  • links: PTS
  • area: main
  • in suites: forky, sid, trixie
  • size: 29,120 kB
  • sloc: python: 115,777; javascript: 11,540; xml: 598; sh: 257; makefile: 173
file content (159 lines) | stat: -rw-r--r-- 5,282 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# -*- coding: utf-8 -*-

import sys

USE_GI = True

DBusException = Exception

if sys.platform not in ("darwin", "win32"):
    if USE_GI:
        try:
            from gi.repository import Gio, GLib
        except ImportError:
            USE_GI = False
    if not USE_GI:
        import dbus
        from dbus.mainloop import glib
    from DisplayCAL.util_xml import XMLDict

    if USE_GI:
        dbus_session = Gio.bus_get_sync(Gio.BusType.SESSION, None)
        dbus_system = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)

        DBusException = GLib.Error
    else:
        glib.threads_init()

        dbus_session = dbus.SessionBus()
        dbus_system = dbus.SystemBus()

        DBusException = dbus.exceptions.DBusException


from DisplayCAL.util_str import safe_str


BUSTYPE_SESSION = 1
BUSTYPE_SYSTEM = 2


class DBusObjectInterfaceMethod:
    def __init__(self, iface, method_name):
        self._iface = iface
        self._method_name = method_name

    def __call__(self, *args, **kwargs):
        if USE_GI:
            format_string = ""
            value = []
            for arg in args:
                if isinstance(arg, str):
                    format_string += "s"
                elif isinstance(arg, int):
                    if arg < 0:
                        format_string += "i"
                    else:
                        format_string += "u"
                elif isinstance(arg, float):
                    format_string += "f"
                else:
                    raise TypeError(f"Unsupported argument type: {type(arg)}")
                value.append(arg)
            args = [f"({format_string})"]
            args.extend(value)
        if "timeout" not in kwargs:
            kwargs["timeout"] = 500
        return getattr(self._iface, self._method_name)(*args, **kwargs)


class DBusObject:
    def __init__(self, bus_type, bus_name, object_path=None, iface_name=None):
        self._bus_type = bus_type
        self._bus_name = bus_name
        if object_path is None:
            object_path = "/" + bus_name.replace(".", "/")
        self._object_path = object_path
        self._iface_name = iface_name
        self._proxy = None
        self._iface = None
        if object_path:
            if bus_type == BUSTYPE_SESSION:
                bus = dbus_session
            else:
                bus = dbus_system
            self._bus = bus
            interface = self._bus_name
            if self._iface_name:
                interface += "." + self._iface_name
            try:
                if USE_GI:
                    self._proxy = Gio.DBusProxy.new_sync(
                        bus,
                        Gio.DBusProxyFlags.NONE,
                        None,
                        bus_name,
                        object_path,
                        interface,
                        None,
                    )
                    self._iface = self._proxy
                else:
                    self._proxy = bus.get_object(bus_name, object_path)
                    self._iface = dbus.Interface(self._proxy, dbus_interface=interface)
            except (TypeError, ValueError, DBusException) as exception:
                raise DBusObjectError(exception, self._bus_name)
        self._introspectable = None

    def __getattr__(self, name):
        name = "".join(part.capitalize() for part in name.split("_"))
        try:
            return DBusObjectInterfaceMethod(self._iface, name)
        except (AttributeError, TypeError, ValueError, DBusException) as exception:
            raise DBusObjectError(exception, self._bus_name)

    @property
    def properties(self):
        if not self._proxy:
            return {}
        interface = self._bus_name
        if self._iface_name:
            interface += "." + self._iface_name
        try:
            if USE_GI:
                iface = Gio.DBusProxy.new_sync(
                    self._bus,
                    Gio.DBusProxyFlags.NONE,
                    None,
                    self._bus_name,
                    self._object_path,
                    "org.freedesktop.DBus.Properties",
                    None,
                )
            else:
                iface = dbus.Interface(self._proxy, "org.freedesktop.DBus.Properties")
            return DBusObjectInterfaceMethod(iface, "GetAll")(interface)
        except (TypeError, ValueError, DBusException) as exception:
            raise DBusObjectError(exception, self._bus_name)

    def introspect(self):
        if not self._introspectable:
            self._introspectable = DBusObject(
                self._bus_type,
                "org.freedesktop.DBus",
                "/" + self._bus_name.replace(".", "/"),
                "Introspectable",
            )
        xml = self._introspectable.Introspect()
        return XMLDict(xml)


class DBusObjectError(DBusException):
    def __init__(self, exception, bus_name=None):
        self._dbus_error_name = getattr(exception, "get_dbus_name", lambda: None)()
        if self._dbus_error_name == "org.freedesktop.DBus.Error.ServiceUnknown":
            exception = "%s: %s" % (exception, bus_name)
        DBusException.__init__(self, safe_str(exception))

    def get_dbus_name(self):
        return self._dbus_error_name