File: hotplug_advanced.py

package info (click to toggle)
python-libusb1 3.3.1%2Bds-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 488 kB
  • sloc: python: 3,259; sh: 98; makefile: 5
file content (237 lines) | stat: -rwxr-xr-x 8,279 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#!/usr/bin/env python
# Copyright (C) 2013-2021  Vincent Pelletier <plr.vincent@gmail.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
"""
Advanced hotplug examples.
Presents ways of integrating hotplug into your userland USB driver.
"""

import select
import sys
import usb1

# A few helpers for demonstration...
mode_dict = {}

class NoHotplugSupport(Exception):
    pass

def onAwesomeDeviceLeft(awesome_device):
    print('Device left:', str(awesome_device))

def onAwesomeDeviceArrived(awesome_device):
    awesome_device.onClose = onAwesomeDeviceLeft
    print('Device arrived:', str(awesome_device))

class SelectPoller(object):
    """
    Dummy poller based on select, because it exists on all platforms.
    WARNING: this class is just for a trivial demonstration, and
    inherits select() limitations. The most important limitation is
    that regitering descriptors does not wake/affect a running poll.
    """
    def __init__(self):
        self._fd_dict = {}

    def register(self, fd, events):
        self._fd_dict[fd] = events

    def unregister(self, fd):
        self._fd_dict.pop(fd)

    def poll(self, timeout=None):
        flag_list = (select.POLLIN, select.POLLOUT, select.POLLPRI)
        result = {}
        for fd_list, happened_flag in zip(
                select.select(*([[
                    fd
                    for fd, events in self._fd_dict.items() if events & flag
                ] for flag in flag_list] + [timeout])),
                flag_list,
            ):
            result[fd] = result.get(fd, 0) | happened_flag
        return list(result.items())
# (end of demonstration helpers)

class AwesomeDevice(object):
    # Application can set this property to do cleanup when device gets closed,
    # for example when device has left.
    onClose = lambda device: None

    def __init__(self, handle):
        self._handle = handle

    def __str__(self):
        # For demonstration purposes only.
        return 'Awesome Device at ' + str(self._handle.getDevice())

    def close(self):
        # Note: device may have already left when this method is called,
        # so catch USBErrorNoDevice around cleanup steps involving the device.
        try:
            self.onClose(self)
            # Put device in low-power mode, release claimed interfaces...
            pass
        except usb1.USBErrorNoDevice:
            pass
        self._handle.close()

class AwesomeDeviceHoarderBase(object):
    """
    Manages the horde of connected devices.
    """

    def __init__(
            self,
            onDeviceArrived=(lambda awesome_device: False),
        ):
        """
        onDeviceArrived (callable)
            Allows further actions by the application when a relevant device
            arrives, so that it integrates with other devices (ex: send
            different key events when the same button is pressed on different
            devices).
            Returns whether this device should be ignored.
            Cannot call synchronous API.
        """
        self.context = usb1.USBContext()
        if not self.context.hasCapability(usb1.CAP_HAS_HOTPLUG):
            raise NoHotplugSupport(
                'Hotplug support is missing. Please update your libusb version.'
            )
        self._device_dict = {}
        self._onDeviceArrived = onDeviceArrived

    def _registerCallback(self):
        self.context.hotplugRegisterCallback(
            self._onHotplugEvent,
            # Just in case more events are added in the future.
            events=usb1.HOTPLUG_EVENT_DEVICE_ARRIVED | usb1.HOTPLUG_EVENT_DEVICE_LEFT,
            # Edit these if you handle devices from a single vendor, of a
            # single product type or of a single device class; and simplify
            # device filtering accordingly in _onHotplugEvent.
            #vendor_id=,
            #product_id=,
            #dev_class=,
        )

    @staticmethod
    def isDeviceSupported(vendor_id, device_id):
        """
        Check if we should drive the device which arrived.
        Simplify this if libusb hotplug API filter is enough (ex: handling a
        single device type).
        """
        # This example handles all devices.
        return True

    def _onHotplugEvent(self, context, device, event):
        if event == usb1.HOTPLUG_EVENT_DEVICE_LEFT:
            awesome_device = self._device_dict.pop(device, None)
            if awesome_device is not None:
                awesome_device.close()
            return
        # Remove next branch if libusb hotplug API filtering is enough (ex:
        # handling a single device type).
        if not self.isDeviceSupported(
                device.getVendorID(),
                device.getProductID(),
            ):
            return
        try:
            handle = device.open()
        except usb1.USBError:
            return
        awesome_device = AwesomeDevice(handle)
        if self._onDeviceArrived(awesome_device):
            awesome_device.close()
            return
        self._device_dict[device] = awesome_device

# Below are alternative APIs. Choose the one most suitable to your needs,
# and ignore the others. This is of course not an exhaustive list.

class AwesomeDeviceHoarderSimple(AwesomeDeviceHoarderBase):
    """
    API 1: USB-event-centric application.
    Simplest API, for userland drivers which only react to USB events.
    """
    def run(self):
        with self.context:
            print('Registering hotplug callback...')
            self._registerCallback()
            print('Callback registered. Monitoring events, ^C to exit')
            while True:
                self.context.handleEvents()

def simple():
    AwesomeDeviceHoarderSimple(onAwesomeDeviceArrived).run()
mode_dict['simple'] = simple

class AwesomeDeviceHoarderEventLoop(AwesomeDeviceHoarderBase):
    """
    API 2:
    More complex, for userland drivers which need to react to other events
    (sockets, user input, ...). Application must then integrate libusb
    polling in its event loop (see usb1.USBPollerThread and usb1.USBPoller).
    """
    def __enter__(self):
        self.context.open()
        print('Registering hotplug callback...')
        self._registerCallback()
        print('Callback registered.')
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.context.close()

def eventloop():
    with AwesomeDeviceHoarderEventLoop(onAwesomeDeviceArrived) as awesome_device_hoarder:
        base_poller = SelectPoller()
        # In real code, file descriptor would be independently registered
        # to base_poller.
        # The event loop would be something like:
        poller = usb1.USBPoller(awesome_device_hoarder.context, base_poller)
        print('Monitoring events, ^C to exit')
        while True:
            poller.poll()
mode_dict['eventloop'] = eventloop

def main():
    try:
        mode = mode_dict[sys.argv[1]]
    except (KeyError, IndexError):
        print('Usage: %s [%s]' % (
            sys.argv[0],
            '|'.join(mode_dict),
        ))
        sys.exit(1)
    print(
        'NOTE: this example needs sufficient permissions to be able to '
        'open USB devices to produce any interesting output. If you see '
        'nothing below, check you have USB devices plugged *and* that you '
        'have sufficient permissions to open them.'
    )
    try:
        mode()
    except NoHotplugSupport as exc:
        print(exc.value)
        sys.exit(1)
    except (KeyboardInterrupt, SystemExit):
        print('Exiting')

if __name__ == '__main__':
    main()