File: reader_client.py

package info (click to toggle)
input-remapper 2.2.0%2Bgit20251231.d605b42-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 2,896 kB
  • sloc: python: 27,435; sh: 45; xml: 33; makefile: 9
file content (289 lines) | stat: -rw-r--r-- 10,309 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# -*- coding: utf-8 -*-
# input-remapper - GUI for device specific keyboard mappings
# Copyright (C) 2025 sezanzeb <b8x45ygc9@mozmail.com>
#
# This file is part of input-remapper.
#
# input-remapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# input-remapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with input-remapper.  If not, see <https://www.gnu.org/licenses/>.


"""Talking to the ReaderService that has root permissions.

see gui.reader_service.ReaderService
"""

import time
from typing import Optional, List, Generator, Dict, Set

import evdev
from gi.repository import GLib

from inputremapper.configs.input_config import InputCombination
from inputremapper.groups import _Groups, _Group
from inputremapper.gui.gettext import _
from inputremapper.gui.messages.message_broker import MessageBroker
from inputremapper.gui.messages.message_data import (
    GroupsData,
    CombinationRecorded,
    StatusData,
)
from inputremapper.gui.messages.message_types import MessageType
from inputremapper.gui.reader_service import (
    MSG_EVENT,
    MSG_GROUPS,
    CMD_TERMINATE,
    CMD_REFRESH_GROUPS,
    CMD_STOP_READING,
    ReaderService,
)
from inputremapper.gui.utils import CTX_ERROR
from inputremapper.input_event import InputEvent
from inputremapper.ipc.pipe import Pipe
from inputremapper.logging.logger import logger

BLACKLISTED_EVENTS = [(1, evdev.ecodes.BTN_TOOL_DOUBLETAP)]
RecordingGenerator = Generator[None, InputEvent, None]


class ReaderClient:
    """Processes events from the reader-service for the GUI to use.

    Does not serve any purpose for the injection service.

    When a button was pressed, the newest keycode can be obtained from this object.
    GTK has get_key for keyboard keys, but Reader also has knowledge of buttons like
    the middle-mouse button.
    """

    # how long to wait for the reader-service at most
    _timeout: int = 5

    def __init__(self, message_broker: MessageBroker, groups: _Groups):
        self.groups = groups
        self.message_broker = message_broker

        self.group: Optional[_Group] = None

        self._recording_generator: Optional[RecordingGenerator] = None
        self._results_pipe, self._commands_pipe = self.connect()

        self.attach_to_events()

        self._read_timeout = GLib.timeout_add(30, self._read)

    def ensure_reader_service_running(self):
        if ReaderService.is_running():
            return

        logger.info("ReaderService not running anymore, restarting")
        ReaderService.pkexec_reader_service()

        # wait until the ReaderService is up

        # wait no more than:
        polling_period = 0.01
        # this will make the gui non-responsive for 0.4s or something. The pkexec
        # password prompt will appear, so the user understands that the lag has to
        # be connected to the authentication. I would actually prefer the frozen gui
        # over a reactive one here, because the short lag shows that stuff is going on
        # behind the scenes.
        for __ in range(int(self._timeout / polling_period)):
            if self._results_pipe.poll():
                logger.info("ReaderService started")
                break

            time.sleep(polling_period)
        else:
            msg = "The reader-service did not start"
            logger.error(msg)
            self.message_broker.publish(StatusData(CTX_ERROR, _(msg)))

    def _send_command(self, command: str):
        """Send a command to the ReaderService."""
        if command not in [CMD_TERMINATE, CMD_STOP_READING]:
            self.ensure_reader_service_running()

        logger.debug('Sending "%s" to ReaderService', command)
        self._commands_pipe.send(command)

    def connect(self):
        """Connect to the reader-service."""
        results_pipe = Pipe(ReaderService.get_pipe_paths()[0])
        commands_pipe = Pipe(ReaderService.get_pipe_paths()[1])
        return results_pipe, commands_pipe

    def attach_to_events(self):
        """Connect listeners to event_reader."""
        self.message_broker.subscribe(
            MessageType.terminate,
            lambda _: self.terminate(),
        )

    def _read(self):
        """Read the messages from the reader-service and handle them."""
        while self._results_pipe.poll():
            message = self._results_pipe.recv()

            logger.debug("received %s", message)

            message_type = message["type"]
            message_body = message["message"]

            if message_type == MSG_GROUPS:
                self._update_groups(message_body)

            if message_type == MSG_EVENT:
                # update the generator
                try:
                    if self._recording_generator is not None:
                        self._recording_generator.send(InputEvent(**message_body))
                    else:
                        # the ReaderService should only send events while the gui
                        # is recording, so this is unexpected.
                        logger.error("Got event, but recorder is not running.")
                except StopIteration:
                    # the _recording_generator returned
                    logger.debug("Recorder finished.")
                    self.stop_recorder()
                    break

        return True

    def start_recorder(self) -> None:
        """Record user input."""
        if self.group is None:
            logger.error("No group set")
            return

        logger.debug("Starting recorder.")
        self._send_command(self.group.key)

        self._recording_generator = self._recorder()
        next(self._recording_generator)

        self.message_broker.signal(MessageType.recording_started)

    def stop_recorder(self) -> None:
        """Stop recording the input.

        Will send recording_finished signals.
        """
        logger.debug("Stopping recorder.")
        self._send_command(CMD_STOP_READING)

        if self._recording_generator:
            self._recording_generator.close()
            self._recording_generator = None
        else:
            # this would be unexpected. but this is not critical enough to
            # show to the user without debug logs
            logger.debug("No recording generator existed")

        self.message_broker.signal(MessageType.recording_finished)

    @staticmethod
    def _input_event_to_config(event: InputEvent):
        return {
            "type": event.type,
            "code": event.code,
            "analog_threshold": event.value,
            "origin_hash": event.origin_hash,
        }

    def _recorder(self) -> RecordingGenerator:
        """Generator which receives InputEvents.

        It accumulates them into EventCombinations and sends those on the
        message_broker. It will stop once all keys or inputs are released.
        """
        active: Set = set()
        accumulator: List[InputEvent] = []
        while True:
            event: InputEvent = yield
            if event.type_and_code in BLACKLISTED_EVENTS:
                continue

            if event.value == 0:
                try:
                    active.remove(event.input_match_hash)
                except KeyError:
                    # we haven't seen this before probably a key got released which
                    # was pressed before we started recording. ignore it.
                    continue

                if not active:
                    # all previously recorded events are released
                    return
                continue

            active.add(event.input_match_hash)
            accu_input_hashes = [e.input_match_hash for e in accumulator]
            if event.input_match_hash in accu_input_hashes and event not in accumulator:
                # the value has changed but the event is already in the accumulator
                # update the event
                i = accu_input_hashes.index(event.input_match_hash)
                accumulator[i] = event
                self.message_broker.publish(
                    CombinationRecorded(
                        InputCombination(map(self._input_event_to_config, accumulator))
                    )
                )

            if event not in accumulator:
                accumulator.append(event)
                self.message_broker.publish(
                    CombinationRecorded(
                        InputCombination(map(self._input_event_to_config, accumulator))
                    )
                )

    def set_group(self, group: Optional[_Group]):
        """Set the group for which input events should be read later."""
        # TODO load the active_group from the controller instead?
        self.group = group

    def terminate(self):
        """Stop reading keycodes for good."""
        self._send_command(CMD_TERMINATE)

        self.stop_recorder()

        if self._read_timeout is not None:
            GLib.source_remove(self._read_timeout)
            self._read_timeout = None

        while self._results_pipe.poll():
            self._results_pipe.recv()

    def refresh_groups(self):
        """Ask the ReaderService for new device groups."""
        self._send_command(CMD_REFRESH_GROUPS)

    def publish_groups(self):
        """Announce all known groups."""
        groups: Dict[str, List[str]] = {
            group.key: group.types or []
            for group in self.groups.filter(include_inputremapper=False)
        }
        self.message_broker.publish(GroupsData(groups))

    def _update_groups(self, dump: str):
        if dump != self.groups.dumps():
            self.groups.loads(dump)
            logger.debug("Received %d devices", len(self.groups))
            self._groups_updated = True

        # send this even if the groups did not change, as the user expects the ui
        # to respond in some form
        self.publish_groups()