File: discover.py

package info (click to toggle)
pyudev 0.21.0-1
  • links: PTS, VCS
  • area: main
  • in suites: buster, stretch
  • size: 716 kB
  • ctags: 870
  • sloc: python: 4,122; makefile: 16
file content (390 lines) | stat: -rw-r--r-- 11,508 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# -*- coding: utf-8 -*-
# Copyright (C) 2015 mulhern <amulhern@redhat.com>

# This library is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation; either version 2.1 of the License, or (at your
# option) any later version.

# This library is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
# for more details.

# You should have received a copy of the GNU Lesser General Public License
# along with this library; if not, write to the Free Software Foundation,
# Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA


"""
    pyudev.discover
    ===============

    Tools to discover a device given limited information.

    .. moduleauthor::  mulhern <amulhern@redhat.com>
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import abc
import functools
import os
import re
import six

from pyudev.device import Devices
from pyudev.device import DeviceNotFoundError


def wrap_exception(func):
    """
    Allow Device discovery methods to return None instead of raising an
    exception.
    """

    @functools.wraps(func)
    def the_func(*args, **kwargs):
        """
        Returns result of calling ``func`` on ``args``, ``kwargs``.
        Returns None if ``func`` raises :exc:`DeviceNotFoundError`.
        """
        try:
            return func(*args, **kwargs)
        except DeviceNotFoundError:
            return None

    return the_func

@six.add_metaclass(abc.ABCMeta)
class Hypothesis(object):
    """
    Represents a hypothesis about the meaning of the device identifier.
    """

    @classmethod
    @abc.abstractmethod
    def match(cls, value): # pragma: no cover
        """
        Match the given string according to the hypothesis.

        The purpose of this method is to obtain a value corresponding to
        ``value`` if that is possible. It may use a regular expression, but
        in general it should just return ``value`` and let the lookup method
        sort out the rest.

        :param str value: the string to inspect
        :returns: the matched thing or None if unmatched
        :rtype: the type of lookup's key parameter or NoneType
        """
        raise NotImplementedError()

    @classmethod
    @abc.abstractmethod
    def lookup(cls, context, key): # pragma: no cover
        """
        Lookup the given string according to the hypothesis.

        :param Context context: the pyudev context
        :param key: a key with which to lookup the device
        :type key: the type of match's return value if not None
        :returns: a list of Devices obtained
        :rtype: frozenset of :class:`Device`
        """
        raise NotImplementedError()

    @classmethod
    def setup(cls, context):
        """
        A potentially expensive method that may allow an :class:`Hypothesis`
        to find devices more rapidly or to find a device that it would
        otherwise miss.

        :param Context context: the pyudev context
        """
        pass

    @classmethod
    def get_devices(cls, context, value):
        """
        Get any devices that may correspond to the given string.

        :param Context context: the pyudev context
        :param str value: the value to look for
        :returns: a list of devices obtained
        :rtype: set of :class:`Device`
        """
        key = cls.match(value)
        return cls.lookup(context, key) if key is not None else frozenset()


class DeviceNumberHypothesis(Hypothesis):
    """
    Represents the hypothesis that the device is a device number.

    The device may be separated into major/minor number or a composite number.
    """

    @classmethod
    def _match_major_minor(cls, value):
        """
        Match the number under the assumption that it is a major,minor pair.

        :param str value: value to match
        :returns: the device number or None
        :rtype: int or NoneType
        """
        major_minor_re = re.compile(
           r'^(?P<major>\d+)(\D+)(?P<minor>\d+)$'
        )
        match = major_minor_re.match(value)
        return match and os.makedev(
           int(match.group('major')),
           int(match.group('minor'))
        )

    @classmethod
    def _match_number(cls, value):
        """
        Match the number under the assumption that it is a single number.

        :param str value: value to match
        :returns: the device number or None
        :rtype: int or NoneType
        """
        number_re = re.compile(r'^(?P<number>\d+)$')
        match = number_re.match(value)
        return match and int(match.group('number'))

    @classmethod
    def match(cls, value):
        """
        Match the number under the assumption that it is a device number.

        :returns: the device number or None
        :rtype: int or NoneType
        """
        return cls._match_major_minor(value) or cls._match_number(value)

    @classmethod
    def find_subsystems(cls, context):
        """
        Find subsystems in /sys/dev.

        :param Context context: the context
        :returns: a lis of available subsystems
        :rtype: list of str
        """
        sys_path = context.sys_path
        return os.listdir(os.path.join(sys_path, 'dev'))

    @classmethod
    def lookup(cls, context, key):
        """
        Lookup by the device number.

        :param Context context: the context
        :param int key: the device number
        :returns: a list of matching devices
        :rtype: frozenset of :class:`Device`
        """
        func = wrap_exception(Devices.from_device_number)
        res = (func(context, s, key) for s in cls.find_subsystems(context))
        return frozenset(r for r in res if r is not None)


class DevicePathHypothesis(Hypothesis):
    """
    Discover the device assuming the identifier is a device path.
    """

    @classmethod
    def match(cls, value):
        """
        Match ``value`` under the assumption that it is a device path.

        :returns: the device path or None
        :rtype: str or NoneType
        """
        return value

    @classmethod
    def lookup(cls, context, key):
        """
        Lookup by the path.

        :param Context context: the context
        :param str key: the device path
        :returns: a list of matching devices
        :rtype: frozenset of :class:`Device`
        """
        res = wrap_exception(Devices.from_path)(context, key)
        return frozenset((res,)) if res is not None else frozenset()


class DeviceNameHypothesis(Hypothesis):
    """
    Discover the device assuming the input is a device name.

    Try every available subsystem.
    """

    @classmethod
    def find_subsystems(cls, context):
        """
        Find all subsystems in sysfs.

        :param Context context: the context
        :rtype: frozenset
        :returns: subsystems in sysfs
        """
        sys_path = context.sys_path
        dirnames = ('bus', 'class', 'subsystem')
        absnames = (os.path.join(sys_path, name) for name in dirnames)
        realnames = (d for d in absnames if os.path.isdir(d))
        return frozenset(n for d in realnames for n in os.listdir(d))

    @classmethod
    def match(cls, value):
        """
        Match ``value`` under the assumption that it is a device name.

        :returns: the device path or None
        :rtype: str or NoneType
        """
        return value

    @classmethod
    def lookup(cls, context, key):
        """
        Lookup by the path.

        :param Context context: the context
        :param str key: the device path
        :returns: a list of matching devices
        :rtype: frozenset of :class:`Device`
        """
        func = wrap_exception(Devices.from_name)
        res = (func(context, s, key) for s in cls.find_subsystems(context))
        return frozenset(r for r in res if r is not None)


class DeviceFileHypothesis(Hypothesis):
    """
    Discover the device assuming the value is some portion of a device file.

    The device file may be a link to a device node.
    """

    _LINK_DIRS = [
       '/dev',
       '/dev/disk/by-id',
       '/dev/disk/by-label',
       '/dev/disk/by-partlabel',
       '/dev/disk/by-partuuid',
       '/dev/disk/by-path',
       '/dev/disk/by-uuid',
       '/dev/input/by-path',
       '/dev/mapper',
       '/dev/md',
       '/dev/vg'
    ]

    @classmethod
    def get_link_dirs(cls, context):
        """
        Get all directories that may contain links to device nodes.

        This method checks the device links of every device, so it is very
        expensive.

        :param Context context: the context
        :returns: a sorted list of directories that contain device links
        :rtype: list
        """
        devices = context.list_devices()
        devices_with_links = (d for d in devices if list(d.device_links))
        links = (l for d in devices_with_links for l in d.device_links)
        return sorted(set(os.path.dirname(l) for l in links))

    @classmethod
    def setup(cls, context):
        """
        Set the link directories to be used when discovering by file.

        Uses `get_link_dirs`, so is as expensive as it is.

        :param Context context: the context
        """
        cls._LINK_DIRS = cls.get_link_dirs(context)

    @classmethod
    def match(cls, value):
        return value

    @classmethod
    def lookup(cls, context, key):
        """
        Lookup the device under the assumption that the key is part of
        the name of a device file.

        :param Context context: the context
        :param str key: a portion of the device file name

        It is assumed that either it is the whole name of the device file
        or it is the basename.

        A device file may be a device node or a device link.
        """
        func = wrap_exception(Devices.from_device_file)
        if '/' in key:
            device = func(context, key)
            return frozenset((device,)) if device is not None else frozenset()
        else:
            files = (os.path.join(ld, key) for ld in cls._LINK_DIRS)
            devices = (func(context, f) for f in files)
            return frozenset(d for d in devices if d is not None)


class Discovery(object):
    # pylint: disable=too-few-public-methods
    """
    Provides discovery methods for devices.
    """

    _HYPOTHESES = [
       DeviceFileHypothesis,
       DeviceNameHypothesis,
       DeviceNumberHypothesis,
       DevicePathHypothesis
    ]

    def __init__(self):
        self._hypotheses = self._HYPOTHESES

    def setup(self, context):
        """
        Set up individual hypotheses.

        May be an expensive call.

        :param Context context: the context
        """
        for hyp in self._hypotheses:
            hyp.setup(context)

    def get_devices(self, context, value):
        """
        Get the devices corresponding to value.

        :param Context context: the context
        :param str value: some identifier of the device
        :returns: a list of corresponding devices
        :rtype: frozenset of :class:`Device`
        """
        return frozenset(
           d for h in self._hypotheses for d in h.get_devices(context, value)
        )