File: eventio_async.py

package info (click to toggle)
python-evdev 1.1.2%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 340 kB
  • sloc: python: 1,576; ansic: 607; makefile: 141
file content (93 lines) | stat: -rw-r--r-- 2,719 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# encoding: utf-8

import asyncio
import select

from evdev import eventio
# needed for compatibility
from evdev.eventio import EvdevError


class EventIO(eventio.EventIO):
    def _do_when_readable(self, callback):
        loop = asyncio.get_event_loop()

        def ready():
            loop.remove_reader(self.fileno())
            callback()
        loop.add_reader(self.fileno(), ready)

    def _set_result(self, future, cb):
        try:
            future.set_result(cb())
        except Exception as error:
            future.set_exception(error)

    def async_read_one(self):
        '''
        Asyncio coroutine to read and return a single input event as
        an instance of :class:`InputEvent <evdev.events.InputEvent>`.
        '''
        future = asyncio.Future()
        self._do_when_readable(lambda: self._set_result(future, self.read_one))
        return future

    def async_read(self):
        '''
        Asyncio coroutine to read multiple input events from device. Return
        a generator object that yields :class:`InputEvent <evdev.events.InputEvent>`
        instances.
        '''
        future = asyncio.Future()
        self._do_when_readable(lambda: self._set_result(future, self.read))
        return future

    def async_read_loop(self):
        '''
        Return an iterator that yields input events. This iterator is
        compatible with the ``async for`` syntax.

        '''
        return ReadIterator(self)


class ReadIterator(object):
    def __init__(self, device):
        self.current_batch = iter(())
        self.device = device

    # Standard iterator protocol.
    def __iter__(self):
        return self

    # Python 2.x compatibility.
    def next(self):
        return self.__next__()

    def __next__(self):
        try:
            # Read from the previous batch of events.
            return next(self.current_batch)
        except StopIteration:
            r, w, x = select.select([self.device.fd], [], [])
            self.current_batch = self.device.read()
            return next(self.current_batch)

    def __aiter__(self):
        return self

    @asyncio.coroutine
    def __anext__(self):
        future = asyncio.Future()
        try:
            # Read from the previous batch of events.
            future.set_result(next(self.current_batch))
        except StopIteration:
            def next_batch_ready(batch):
                try:
                    self.current_batch = batch.result()
                    future.set_result(next(self.current_batch))
                except Exception as e:
                    future.set_exception(e)
            self.device.async_read().add_done_callback(next_batch_ready)
        return future