File: NetHadessPowerProfiles.py

package info (click to toggle)
powerdevil 4%3A6.5.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,680 kB
  • sloc: cpp: 13,284; xml: 1,911; python: 1,204; sh: 19; makefile: 10
file content (160 lines) | stat: -rw-r--r-- 8,287 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/bin/env python3

# SPDX-FileCopyrightText: 2024 Fushan Wen <qydwhotmail@gmail.com>
# SPDX-License-Identifier: GPL-3.0-or-later

# pylint: disable=too-many-arguments

import logging
import os
import threading
from typing import Any, Final

from gi.repository import Gio, GLib

current_folder: Final = os.path.dirname(os.path.abspath(__file__))


class NetHadessPowerProfiles:
    """
    D-Bus interface for org.freedesktop.UPower.PowerProfiles/org.freedesktop.UPower.PowerProfiles
    """

    BUS_NAME: Final = "org.freedesktop.UPower.PowerProfiles"
    OBJECT_PATH: Final = "/org/freedesktop/UPower/PowerProfiles"
    IFACE_NAME: Final = "org.freedesktop.UPower.PowerProfiles"

    __connection: Gio.DBusConnection

    def __init__(self) -> None:
        self.__ppd_reg_id: int = 0
        self.ppd_properties: dict[str, GLib.Variant] = {
            "ActiveProfile": GLib.Variant("s", "balanced"),
            "PerformanceInhibited": GLib.Variant("s", ""),  # deprecated
            "PerformanceDegraded": GLib.Variant("s", ""),
            "Profiles": GLib.Variant("aa{sv}", [
                {
                    "Profile": GLib.Variant("s", "power-saver"),
                    "PlatformDriver": GLib.Variant("s", "appiumtest"),
                    "Driver": GLib.Variant("s", "appiumtest"),
                },
                {
                    "Profile": GLib.Variant("s", "balanced"),
                    "PlatformDriver": GLib.Variant("s", "appiumtest"),
                    "Driver": GLib.Variant("s", "appiumtest"),
                },
                {
                    "Profile": GLib.Variant("s", "performance"),
                    "PlatformDriver": GLib.Variant("s", "appiumtest"),
                    "Driver": GLib.Variant("s", "appiumtest"),
                },
            ]),
            "Actions": GLib.Variant("as", ["trickle_charge", "amdgpu_panel_power"]),
            "ActiveProfileHolds": {},  # aa{sv}
            "Version": GLib.Variant("s", "0.21"),
        }
        self.__global_cookie: int = 0
        self.registered_event = threading.Event()
        self.active_profile_set_event = threading.Event()

        self.__owner_id: int = Gio.bus_own_name(Gio.BusType.SYSTEM, self.BUS_NAME, Gio.BusNameOwnerFlags.NONE, self.on_bus_acquired, None, None)
        assert self.__owner_id > 0

    def quit(self) -> None:
        self.__connection.unregister_object(self.__ppd_reg_id)
        self.__ppd_reg_id = 0
        Gio.bus_unown_name(self.__owner_id)
        self.__connection.flush_sync(None)  # Otherwise flaky

    def on_bus_acquired(self, connection: Gio.DBusConnection, name: str, *args) -> None:
        """
        Interface is ready, now register objects.
        """
        self.__connection = connection

        with open(os.path.join(current_folder, os.pardir, os.pardir, "daemon/actions/bundled/org.freedesktop.UPower.PowerProfiles.xml"), encoding="utf-8") as file_handler:
            introspection_xml: str = '\n'.join(file_handler.readlines())
            introspection_data = Gio.DBusNodeInfo.new_for_xml(introspection_xml)
            self.__ppd_reg_id = connection.register_object(self.OBJECT_PATH, introspection_data.interfaces[0], self.ppd_handle_method_call, self.ppd_handle_get_property, self.ppd_handle_set_property)
            assert self.__ppd_reg_id > 0

        self.registered_event.set()

    def set_profile(self, new_profile: str) -> None:
        self.ppd_properties["ActiveProfile"] = GLib.Variant("s", new_profile)
        changed_properties = {
            "ActiveProfile": self.ppd_properties["ActiveProfile"],
        }
        if len(self.ppd_properties["ActiveProfileHolds"]) > 0:
            released_cookies: list[int] = list(self.ppd_properties["ActiveProfileHolds"].keys())
            self.ppd_properties["ActiveProfileHolds"] = {}
            changed_properties["ActiveProfileHolds"] = self.active_profile_holds()
            for cookie in released_cookies:
                # This signal will be emitted if the profile is released because the
                # "ActiveProfile" was manually changed. The signal will only be emitted
                # to the process that originally called "HoldProfile".
                Gio.DBusConnection.emit_signal(self.__connection, None, self.OBJECT_PATH, self.IFACE_NAME, "ProfileReleased", GLib.Variant.new_tuple(GLib.Variant("u", cookie)))

        Gio.DBusConnection.emit_signal(self.__connection, None, self.OBJECT_PATH, "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant.new_tuple(GLib.Variant("s", self.IFACE_NAME), GLib.Variant('a{sv}', changed_properties), GLib.Variant('as', [])))
        self.active_profile_set_event.set()

    def hold_profile(self, profile: str, reason: str, application_id: str) -> int:
        self.__global_cookie += 1
        self.ppd_properties["ActiveProfileHolds"][self.__global_cookie] = {
            "Profile": GLib.Variant("s", profile),
            "Reason": GLib.Variant("s", reason),
            "ApplicationId": GLib.Variant("s", application_id),
        }
        changed_properties = GLib.Variant('a{sv}', {
            "ActiveProfileHolds": self.active_profile_holds(),
        })
        Gio.DBusConnection.emit_signal(self.__connection, None, self.OBJECT_PATH, "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant.new_tuple(GLib.Variant("s", self.IFACE_NAME), changed_properties, GLib.Variant('as', [])))
        return self.__global_cookie

    def release_profile(self, cookie: int) -> None:
        del self.ppd_properties["ActiveProfileHolds"][cookie]
        changed_properties = GLib.Variant('a{sv}', {
            "ActiveProfileHolds": self.active_profile_holds(),
        })
        Gio.DBusConnection.emit_signal(self.__connection, None, self.OBJECT_PATH, "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant.new_tuple(GLib.Variant("s", self.IFACE_NAME), changed_properties, GLib.Variant('as', [])))

    def active_profile_holds(self) -> GLib.Variant:
        return GLib.Variant("aa{sv}", list(self.ppd_properties["ActiveProfileHolds"].values()))

    def set_performance_degraded_reason(self, reason: str) -> None:
        self.ppd_properties["PerformanceDegraded"] = GLib.Variant("s", reason)
        changed_properties = GLib.Variant('a{sv}', {
            "PerformanceDegraded": self.ppd_properties["PerformanceDegraded"],
        })
        Gio.DBusConnection.emit_signal(self.__connection, None, self.OBJECT_PATH, "org.freedesktop.DBus.Properties", "PropertiesChanged", GLib.Variant.new_tuple(GLib.Variant("s", self.IFACE_NAME), changed_properties, GLib.Variant('as', [])))

    def ppd_handle_method_call(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, method_name: str, parameters: GLib.Variant, invocation: Gio.DBusMethodInvocation) -> None:
        assert interface_name == self.BUS_NAME, f"Unknown interface {interface_name}"
        logging.info("ppd calling %s", method_name)

        if method_name == "HoldProfile":
            invocation.return_value(GLib.Variant.new_tuple(GLib.Variant("u", self.hold_profile(parameters[0], parameters[1], parameters[2]))))

        elif method_name == "ReleaseProfile":
            self.release_profile(parameters[0])
            invocation.return_value(None)

        else:
            logging.error("Unhandled method: %s", method_name)
            invocation.return_error_literal(Gio.dbus_error_quark(), Gio.DBusError.UNKNOWN_METHOD, f"Unknown method {method_name}")

    def ppd_handle_get_property(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, value: Any):
        if value not in self.ppd_properties:
            logging.error("%s does not exist", value)
            return None
        logging.info("ppd get_property %s", value)
        if value == "ActiveProfileHolds":
            return self.active_profile_holds()
        return self.ppd_properties[value]

    def ppd_handle_set_property(self, connection: Gio.DBusConnection, sender: str, object_path: str, interface_name: str, key: str, value: Any) -> bool:
        logging.info("ppd set_property %s %s", key, value)
        if key == "ActiveProfile":
            self.set_profile(value.get_string())
            return True
        return False