File: inputlistener.py

package info (click to toggle)
screenkey 1%3A1.5-8
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 536 kB
  • sloc: python: 3,835; xml: 92; makefile: 9; sh: 8
file content (448 lines) | stat: -rw-r--r-- 15,802 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
# Distributed under the GNU GPLv3+ license, WITHOUT ANY WARRANTY.
# Copyright(c) 2015: wave++ "Yuri D'Elia" <wavexx@thregr.org>
#
# Outputting translated X11 keystrokes is not a simple problem as soon as XIM
# is introduced: getting an updated keyboard/modifier map is not enough to
# replicate the [complex] logic hidden in the input method.
#
# For this reason we use a fairly convoluted mechanism: we record keystrokes
# using the XRecord extension, but we relay them to another fake window running
# on the same server. By manipulating the event, we trick the input method to
# perform composition for us, and poll for translated output events using
# Xutf8LookupString. Since we cannot determine the state of the input context
# for the target window (we're recording blindly), we also need to reset
# context state carefully when the user switches the active focus. 3(!) extra
# connections to the display server are required for this task, and since we're
# using blocking APIs, having to run on our own thread means we cannot share
# any of those with the regular process. On the other hand, many other keycode
# translation issues are avoided by using the string lookup directly.
#
# This is, of course, never going to be always identical to the final output,
# since we're guessing the state of the client (we do the same when guessing
# the result of backspace anyway). But incidentally this method would also
# allow us to poll the input mechanism while composing, to better reflect the
# actual typing on the keyboard.
#
# Some of the code /could/ have been simplified by using XCB for protocol
# translation, but since there's no equivalent to XKB/XIM, I found the exercise
# futile. Needing to use XIM directly also barred pure-python equivalents. As
# a result, we have to drop back to ctypes for _extra_ phun.
#
# Drop me a line if you ever find this comment helpful, as finding a decent
# solution was not trivial -- YD 21/08/2015.

if __name__ == '__main__':
    import xlib
    import keysyms
else:
    from . import xlib
    from . import keysyms

import sys
if sys.version_info.major < 3:
    import glib
else:
    from gi.repository import GLib as glib

import threading
import warnings
import select


# convenience wrappers
def coalesce_ranges(ranges):
    ranges = sorted(ranges, key=lambda x: x[0])
    ret = ranges[:1]
    for r in ranges[1:]:
        if ret[-1][1] < r[0] - 1:
            ret.append(r)
        else:
            ret[-1][1] = max(ret[-1][1], r[1])
    return ret


def record_context(dpy, ev_ranges, dev_ranges):
    ev_ranges = coalesce_ranges(ev_ranges)
    dev_ranges = coalesce_ranges(dev_ranges)

    specs = max(len(ev_ranges), len(dev_ranges))
    range_specs = (xlib.POINTER(xlib.XRecordRange) * specs)()

    for i in range(specs):
        range_specs[i] = xlib.XRecordAllocRange()
        if len(ev_ranges) > i:
            range_specs[i].contents.delivered_events.first = ev_ranges[i][0]
            range_specs[i].contents.delivered_events.last = ev_ranges[i][1]
        if len(dev_ranges) > i:
            range_specs[i].contents.device_events.first = dev_ranges[i][0]
            range_specs[i].contents.device_events.last = dev_ranges[i][1]

    rec_ctx = xlib.XRecordCreateContext(
        dpy, 0,
        xlib.byref(xlib.c_ulong(xlib.XRecordAllClients)), 1,
        range_specs, specs)

    for i in range(specs):
        xlib.XFree(range_specs[i])

    return rec_ctx


def record_enable(dpy, rec_ctx, callback):
    def intercept(data):
        if data.category != xlib.XRecordFromServer:
            return
        if data.client_swapped:
            warnings.warn("cannot handle swapped protocol data")
            return
        ev = xlib.XWireToEvent(dpy, data.data)
        callback(ev)

    def intercept_(_, data):
        intercept(data.contents)
        xlib.XRecordFreeData(data)

    proc = xlib.XRecordInterceptProc(intercept_)
    xlib.XRecordEnableContextAsync(dpy, rec_ctx, proc, None)
    return proc


def create_replay_window(dpy):
    win_attr = xlib.XSetWindowAttributes()
    win_attr.override_redirect = True
    win = xlib.XCreateWindow(dpy, xlib.XDefaultRootWindow(dpy),
                             0, 0, 1, 1, 0,
                             xlib.CopyFromParent, xlib.InputOnly, None,
                             xlib.CWOverrideRedirect,
                             xlib.byref(win_attr))
    return win


def phantom_release(dpy, kev):
    if not xlib.XPending(dpy):
        return False
    ev = xlib.XEvent()
    xlib.XPeekEvent(dpy, xlib.byref(ev))
    return (ev.type == xlib.KeyPress and \
            ev.xkey.state == kev.state and \
            ev.xkey.keycode == kev.keycode and \
            ev.xkey.time == kev.time)


def keysym_to_unicode(keysym):
    if 0x01000000 <= keysym <= 0x0110FFFF:
        return unichr(keysym - 0x01000000)
    keydata = keysyms.KEYSYMS.get(keysym)
    if keydata is not None:
        return keydata[0]
    return None



class KeyData():
    def __init__(self, pressed=None, filtered=None, repeated=None,
                 string=None, keysym=None, status=None, symbol=None,
                 mods_mask=None, modifiers=None):
        self.pressed = pressed
        self.filtered = filtered
        self.repeated = repeated
        self.string = string
        self.keysym = keysym
        self.status = status
        self.symbol = symbol
        self.mods_mask = mods_mask
        self.modifiers = modifiers


class ButtonData():
    def __init__(self, btn, pressed):
        self.btn = btn
        self.pressed = pressed == xlib.ButtonPress


class InputType:
    keyboard = 0b001
    button   = 0b010
    movement = 0b100
    all      = 0b111



class InputListener(threading.Thread):
    def __init__(self, event_callback, input_types=InputType.all,
                 kbd_compose=True, kbd_translate=True):
        super().__init__()
        self.event_callback = event_callback
        self.input_types = input_types
        self.kbd_compose = kbd_compose
        self.kbd_translate = kbd_translate
        self.lock = threading.Lock()
        self.stopped = True
        self.error = None


    def _event_received(self, ev):
        if xlib.KeyPress <= ev.type <= xlib.MotionNotify:
            xlib.XSendEvent(self.replay_dpy, self.replay_win, False, 0, ev)
        elif ev.type in [xlib.FocusIn, xlib.FocusOut]:
            # Forward the event as a custom message in the same queue instead
            # of resetting the XIC directly, in order to preserve queued events
            fwd_ev = xlib.XEvent()
            fwd_ev.type = xlib.ClientMessage
            fwd_ev.xclient.message_type = self.custom_atom
            fwd_ev.xclient.format = 32
            fwd_ev.xclient.data[0] = ev.type
            xlib.XSendEvent(self.replay_dpy, self.replay_win, False, 0, fwd_ev)


    def _event_callback(self, data):
        self.event_callback(data)
        return False

    def _event_processed(self, data):
        data.symbol = xlib.XKeysymToString(data.keysym)
        if data.string is None:
            data.string = keysym_to_unicode(data.keysym)
        glib.idle_add(self._event_callback, data)


    def _event_modifiers(self, kev, data):
        data.modifiers = modifiers = {}
        modifiers['shift'] = bool(kev.state & xlib.ShiftMask)
        modifiers['caps_lock'] = bool(kev.state & xlib.LockMask)
        modifiers['ctrl'] = bool(kev.state & xlib.ControlMask)
        modifiers['alt'] = bool(kev.state & xlib.Mod1Mask)
        modifiers['num_lock'] = bool(kev.state & xlib.Mod2Mask)
        modifiers['hyper'] = bool(kev.state & xlib.Mod3Mask)
        modifiers['super'] = bool(kev.state & xlib.Mod4Mask)
        modifiers['alt_gr'] = bool(kev.state & xlib.Mod5Mask)


    def _event_keypress(self, kev, data):
        buf = xlib.create_string_buffer(16)
        keysym = xlib.KeySym()
        status = xlib.Status()
        ret = xlib.Xutf8LookupString(self._kbd_replay_xic, kev, buf, len(buf),
                                     xlib.byref(keysym), xlib.byref(status))
        if ret != xlib.NoSymbol:
            if 32 <= keysym.value <= 126:
                # avoid ctrl sequences, just take the character value
                data.string = chr(keysym.value)
            else:
                try:
                    data.string = buf.value.decode('utf-8')
                except UnicodeDecodeError:
                    pass
        data.keysym = keysym.value
        data.status = status.value


    def _event_lookup(self, kev, data):
        # this is mostly for debugging: we do not account for group/level
        data.keysym = xlib.XkbKeycodeToKeysym(kev.display, kev.keycode, 0, 0)


    def start(self):
        self.lock.acquire()
        self.stopped = False
        self.error = None
        super().start()


    def stop(self):
        with self.lock:
            if not self.stopped:
                self.stopped = True
                xlib.XRecordDisableContext(self.control_dpy, self.record_ctx)


    def _kbd_init(self):
        self._kbd_last_ev = xlib.XEvent()

        if self.kbd_compose:
            style = xlib.XIMPreeditNothing | xlib.XIMStatusNothing
        else:
            style = xlib.XIMPreeditNone | xlib.XIMStatusNone

        # TODO: implement preedit callbacks for on-the-spot composition
        #       (this would fix focus-stealing for the global IM state)
        self._kbd_replay_xim = xlib.XOpenIM(self.replay_dpy, None, None, None)
        if not self._kbd_replay_xim:
            raise Exception("Cannot initialize input method")

        self._kbd_replay_xic = xlib.XCreateIC(self._kbd_replay_xim,
                                              xlib.XNClientWindow, self.replay_win,
                                              xlib.XNInputStyle, style,
                                              None)
        xlib.XSetICFocus(self._kbd_replay_xic)


    def _kbd_del(self):
        xlib.XDestroyIC(self._kbd_replay_xic)
        xlib.XCloseIM(self._kbd_replay_xim)


    def _kbd_process(self, ev):
        if ev.type == xlib.ClientMessage and \
           ev.xclient.message_type == self.custom_atom:
            if ev.xclient.data[0] in [xlib.FocusIn, xlib.FocusOut]:
                # we do not keep track of multiple XICs, just reset
                xic = xlib.Xutf8ResetIC(self._kbd_replay_xic)
                if xic is not None: xlib.XFree(xic)
            return
        elif ev.type in [xlib.KeyPress, xlib.KeyRelease]:
            # fake keyboard event data for XFilterEvent
            ev.xkey.send_event = False
            ev.xkey.window = self.replay_win

        # pass _all_ events to XFilterEvent
        filtered = bool(xlib.XFilterEvent(ev, 0))
        if ev.type == xlib.KeyRelease and \
           phantom_release(self.replay_dpy, ev.xkey):
            return
        if ev.type not in [xlib.KeyPress, xlib.KeyRelease]:
            return

        # generate new keyboard event
        data = KeyData()
        data.filtered = filtered
        data.pressed = (ev.type == xlib.KeyPress)
        data.repeated = (ev.type == self._kbd_last_ev.type and \
                         ev.xkey.state == self._kbd_last_ev.xkey.state and \
                         ev.xkey.keycode == self._kbd_last_ev.xkey.keycode)
        data.mods_mask = ev.xkey.state
        self._event_modifiers(ev.xkey, data)
        if not data.filtered and data.pressed and self.kbd_translate:
            self._event_keypress(ev.xkey, data)
        else:
            self._event_lookup(ev.xkey, data)
        self._event_processed(data)
        self._kbd_last_ev = ev


    def _btn_process(self, ev):
        if ev.type in [xlib.ButtonPress, xlib.ButtonRelease]:
            data = ButtonData(ev.xbutton.button, ev.type)
            glib.idle_add(self._event_callback, data)


    def run(self):
        # control connection
        self.control_dpy = xlib.XOpenDisplay(None)
        xlib.XSynchronize(self.control_dpy, True)

        # unmapped replay window
        self.replay_dpy = xlib.XOpenDisplay(None)
        self.custom_atom = xlib.XInternAtom(self.replay_dpy, b"SCREENKEY", False)
        replay_fd = xlib.XConnectionNumber(self.replay_dpy)
        self.replay_win = create_replay_window(self.replay_dpy)

        # bail during initialization errors
        try:
            if self.input_types & InputType.keyboard:
                self._kbd_init()
        except Exception as e:
            self.error = e
            xlib.XCloseDisplay(self.control_dpy)
            xlib.XDestroyWindow(self.replay_dpy, self.replay_win)
            xlib.XCloseDisplay(self.replay_dpy)

            # cheap wakeup() equivalent for compatibility
            glib.idle_add(self._event_callback, None)

            self.stopped = True
            self.lock.release()
            return

        # initialize recording context
        ev_ranges = []
        dev_ranges = []
        if self.input_types & InputType.keyboard:
            ev_ranges.append([xlib.FocusIn, xlib.FocusOut])
            dev_ranges.append([xlib.KeyPress, xlib.KeyRelease])
        if self.input_types & InputType.button:
            dev_ranges.append([xlib.ButtonPress, xlib.ButtonRelease])
        if self.input_types & InputType.movement:
            dev_ranges.append([xlib.MotionNotify, xlib.MotionNotify])
        self.record_ctx = record_context(self.control_dpy, ev_ranges, dev_ranges);

        record_dpy = xlib.XOpenDisplay(None)
        record_fd = xlib.XConnectionNumber(record_dpy)
        # we need to keep the record_ref alive(!)
        record_ref = record_enable(record_dpy, self.record_ctx, self._event_received)

        # event loop
        self.lock.release()
        while True:
            with self.lock:
                if self.stopped:
                    break

            r_fd = []
            if xlib.XPending(record_dpy):
                r_fd.append(record_fd)
            if xlib.XPending(self.replay_dpy):
                r_fd.append(replay_fd)
            if not r_fd:
                r_fd, _, _ = select.select([record_fd, replay_fd], [], [])
            if not r_fd:
                break

            if record_fd in r_fd:
                xlib.XRecordProcessReplies(record_dpy)
                xlib.XFlush(self.replay_dpy)

            if replay_fd in r_fd:
                ev = xlib.XEvent()
                xlib.XNextEvent(self.replay_dpy, xlib.byref(ev))
                if self.input_types & InputType.keyboard:
                    self._kbd_process(ev)
                if self.input_types & InputType.button:
                    self._btn_process(ev)

        # finalize
        self.lock.acquire()

        xlib.XRecordFreeContext(self.control_dpy, self.record_ctx)
        xlib.XCloseDisplay(self.control_dpy)
        xlib.XCloseDisplay(record_dpy)
        del record_ref

        if self.input_types & InputType.keyboard:
            self._kbd_del()

        xlib.XDestroyWindow(self.replay_dpy, self.replay_win)
        xlib.XCloseDisplay(self.replay_dpy)

        self.stopped = True
        self.lock.release()



if __name__ == '__main__':
    def callback(data):
        values = {}
        for k in dir(data):
            if k[0] == '_': continue
            values[k] = getattr(data, k)
        print(values)

    kl = InputListener(callback)
    try:
        # keep running only while the listener is alive
        kl.start()
        while kl.is_alive():
            glib.main_context_default().iteration()
    except KeyboardInterrupt:
        pass

    # check if the thread terminated unexpectedly
    if kl.is_alive():
        kl.stop()
        kl.join()
    elif kl.error:
        print("initialization error: {}".format(kl.error))
        if '__traceback__' in dir(kl.error):
            import traceback
            traceback.print_tb(kl.error.__traceback__)
        exit(1)