File: socket.py

package info (click to toggle)
input-remapper 2.2.0%2Bgit20251231.d605b42-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 2,896 kB
  • sloc: python: 27,435; sh: 45; xml: 33; makefile: 9
file content (303 lines) | stat: -rw-r--r-- 9,418 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# -*- coding: utf-8 -*-
# input-remapper - GUI for device specific keyboard mappings
# Copyright (C) 2025 sezanzeb <b8x45ygc9@mozmail.com>
#
# This file is part of input-remapper.
#
# input-remapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# input-remapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with input-remapper.  If not, see <https://www.gnu.org/licenses/>.


"""Non-blocking abstraction of unix domain sockets.

>>> server = Server('foo')
>>> client = Client('foo')

>>> server.send(1)
>>> client.poll()
>>> client.recv()

>>> client.send(2)
>>> server.poll()
>>> server.recv()

I seems harder to sniff on a socket than using pipes for other non-root
processes, but it doesn't guarantee security. As long as the GUI is open
and not running as root user, it is most likely possible to somehow log
keycodes by looking into the memory of the gui process (just like with most
other applications because they end up receiving keyboard input as well).
It still appears to be a bit overkill to use a socket considering pipes
are much easier to handle.
"""


# Issues:
# - Tests don't pass with Server and Client instead of Pipe for reader-client
#   and service communication or something
# - Had one case of a test that was blocking forever, seems very rare.
# - Hard to debug, generally very problematic compared to Pipes
# The tool works fine, it's just the tests. BrokenPipe errors reported
# by _Server all the time.


import json
import os
import select
import socket
import time
from typing import Union

from inputremapper.configs.paths import PathUtils
from inputremapper.logging.logger import logger

# something funny that most likely won't appear in messages.
# also add some ones so that 01 in the payload won't offset
# a match by 2 bits
END = b"\x55\x55\xff\x55"  # should be 01010101 01010101 11111111 01010101

ENCODING = "utf8"


# reusing existing objects makes tests easier, no headaches about closing
# and reopening anymore. The ui also only runs only one instance of each all
# the time.
existing_servers = {}
existing_clients = {}


class Base:
    """Abstract base class for Socket and Client."""

    def __init__(self, path):
        self._path = path
        self._unread = []
        self.unsent = []
        PathUtils.mkdir(os.path.dirname(path))
        self.connection = None
        self.socket = None
        self._created_at = 0
        self.reset()

    def reset(self):
        """Ignore older messages than now."""
        # ensure it is connected
        self.connect()
        self._created_at = time.time()

    def connect(self):
        """Returns True if connected, and if not attempts to connect."""
        raise NotImplementedError

    def fileno(self):
        """For compatibility with select.select."""
        raise NotImplementedError

    def reconnect(self):
        """Try to make a new connection."""
        raise NotImplementedError

    def _receive_new_messages(self):
        if not self.connect():
            logger.debug("Not connected")
            return

        messages = b""
        attempts = 0
        while True:
            try:
                chunk = self.connection.recvmsg(4096)[0]
                messages += chunk

                if len(chunk) == 0:
                    # select keeps telling me the socket has messages
                    # ready to be received, and I keep getting empty
                    # buffers. Happened during a test that ran two reader-service
                    # processes without stopping the first one.
                    attempts += 1
                    if attempts == 2 or not self.reconnect():
                        return

            except (socket.timeout, BlockingIOError):
                break

        split = messages.split(END)
        for message in split:
            if len(message) > 0:
                parsed = json.loads(message.decode(ENCODING))
                if parsed[0] < self._created_at:
                    # important to avoid race conditions between multiple
                    # unittests, for example old terminate messages reaching
                    # a new instance of the reader-service.
                    logger.debug("Ignoring old message %s", parsed)
                    continue

                self._unread.append(parsed[1])

    def recv(self):
        """Get the next message or None if nothing to read.

        Doesn't transmit pickles, to avoid injection attacks on the
        privileged reader-service. Only messages that can be converted to json
        are allowed.
        """
        self._receive_new_messages()

        if len(self._unread) == 0:
            return None

        return self._unread.pop(0)

    def poll(self):
        """Check if a message to read is available."""
        if len(self._unread) > 0:
            return True

        self._receive_new_messages()
        return len(self._unread) > 0

    def send(self, message: Union[str, int, float, dict, list, tuple]):
        """Send json-serializable messages."""
        dump = bytes(json.dumps((time.time(), message)), ENCODING)
        self.unsent.append(dump)

        if not self.connect():
            logger.debug("Not connected")
            return

        def send_all():
            while len(self.unsent) > 0:
                unsent = self.unsent[0]
                self.connection.sendall(unsent + END)
                # sending worked, remove message
                self.unsent.pop(0)

        # attempt sending twice in case it fails
        try:
            send_all()
        except BrokenPipeError:
            if not self.reconnect():
                logger.error(
                    '%s: The other side of "%s" disappeared',
                    type(self).__name__,
                    self._path,
                )
                return

            try:
                send_all()
            except BrokenPipeError as error:
                logger.error(
                    '%s: Failed to send via "%s": %s',
                    type(self).__name__,
                    self._path,
                    error,
                )


class _Client(Base):
    """A socket that can be written to and read from."""

    def connect(self):
        if self.socket is not None:
            return True

        try:
            _socket = socket.socket(socket.AF_UNIX)
            _socket.connect(self._path)
            logger.debug('Connected to socket: "%s"', self._path)
            _socket.setblocking(False)
        except Exception as error:
            logger.debug('Failed to connect to "%s": "%s"', self._path, error)
            return False

        self.socket = _socket
        self.connection = _socket
        existing_clients[self._path] = self
        return True

    def fileno(self):
        """For compatibility with select.select."""
        self.connect()
        return self.socket.fileno()

    def reconnect(self):
        self.connection = None
        self.socket = None
        return self.connect()


def Client(path):
    if path in existing_clients:
        # ensure it is running, might have been closed
        existing_clients[path].reset()
        return existing_clients[path]

    return _Client(path)


class _Server(Base):
    """A socket that can be written to and read from.

    It accepts one connection at a time, and drops old connections if
    a new one is in sight.
    """

    def connect(self):
        if self.socket is None:
            if os.path.exists(self._path):
                # leftover from the previous execution
                os.remove(self._path)

            _socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            _socket.bind(self._path)
            _socket.listen(1)
            PathUtils.chown(self._path)
            logger.debug('Created socket: "%s"', self._path)
            self.socket = _socket
            self.socket.setblocking(False)
            existing_servers[self._path] = self

        incoming = len(select.select([self.socket], [], [], 0)[0]) != 0
        if not incoming and self.connection is None:
            # no existing connection, no client attempting to connect
            return False

        if not incoming and self.connection is not None:
            # old connection
            return True

        if incoming:
            logger.debug('Incoming connection: "%s"', self._path)
            connection = self.socket.accept()[0]
            self.connection = connection
            self.connection.setblocking(False)

        return True

    def fileno(self):
        """For compatibility with select.select."""
        self.connect()
        return self.connection.fileno()

    def reconnect(self):
        self.connection = None
        return self.connect()


def Server(path):
    if path in existing_servers:
        # ensure it is running, might have been closed
        existing_servers[path].reset()
        return existing_servers[path]

    return _Server(path)