File: device_hints.py

package info (click to toggle)
ironic-python-agent 10.2.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,416 kB
  • sloc: python: 35,563; sh: 60; makefile: 29
file content (314 lines) | stat: -rw-r--r-- 13,403 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import re
from urllib import parse as urlparse

from oslo_log import log as logging
from oslo_utils import specs_matcher
from oslo_utils import strutils
from oslo_utils import units


LOG = logging.getLogger(__name__)


# A dictionary in the form {hint name: hint type}
VALID_ROOT_DEVICE_HINTS = {
    'size': int, 'model': str, 'wwn': str, 'serial': str, 'vendor': str,
    'wwn_with_extension': str, 'wwn_vendor_extension': str, 'name': str,
    'rotational': bool, 'hctl': str, 'by_path': str,
}

ROOT_DEVICE_HINTS_GRAMMAR = specs_matcher.make_grammar()


def _extract_hint_operator_and_values(hint_expression, hint_name):
    """Extract the operator and value(s) of a root device hint expression.

    A root device hint expression could contain one or more values
    depending on the operator. This method extracts the operator and
    value(s) and returns a dictionary containing both.

    :param hint_expression: The hint expression string containing value(s)
                            and operator (optionally).
    :param hint_name: The name of the hint. Used for logging.
    :raises: ValueError if the hint_expression is empty.
    :returns: A dictionary containing:

        :op: The operator. An empty string in case of None.
        :values: A list of values stripped and converted to lowercase.
    """
    expression = str(hint_expression).strip().lower()
    if not expression:
        raise ValueError(f'Root device hint {hint_name} expression is empty')

    # parseString() returns a list of tokens which the operator (if
    # present) is always the first element.
    ast = ROOT_DEVICE_HINTS_GRAMMAR.parseString(expression)
    if len(ast) <= 1:
        # hint_expression had no operator
        return {'op': '', 'values': [expression]}

    op = ast[0]
    return {'values': [v.strip() for v in re.split(op, expression) if v],
            'op': op}


def _normalize_hint_expression(hint_expression, hint_name):
    """Normalize a string type hint expression.

    A string-type hint expression contains one or more operators and
    one or more values: [<op>] <value> [<op> <value>]*. This normalizes
    the values by url-encoding white spaces and special characters. The
    operators are not normalized. For example: the hint value of "<or>
    foo bar <or> bar" will become "<or> foo%20bar <or> bar".

    :param hint_expression: The hint expression string containing value(s)
                            and operator (optionally).
    :param hint_name: The name of the hint. Used for logging.
    :raises: ValueError if the hint_expression is empty.
    :returns: A normalized string.
    """
    hdict = _extract_hint_operator_and_values(hint_expression, hint_name)
    result = hdict['op'].join([' %s ' % urlparse.quote(t)
                               for t in hdict['values']])
    return (hdict['op'] + result).strip()


def _append_operator_to_hints(root_device):
    """Add an equal (s== or ==) operator to the hints.

    For backwards compatibility, for root device hints where no operator
    means equal, this method adds the equal operator to the hint. This is
    needed when using oslo.utils.specs_matcher methods.

    :param root_device: The root device hints dictionary.
    """
    for name, expression in root_device.items():
        # NOTE(lucasagomes): The specs_matcher from oslo.utils does not
        # support boolean, so we don't need to append any operator
        # for it.
        if VALID_ROOT_DEVICE_HINTS[name] is bool:
            continue

        expression = str(expression)
        ast = ROOT_DEVICE_HINTS_GRAMMAR.parseString(expression)
        if len(ast) > 1:
            continue

        op = 's== %s' if VALID_ROOT_DEVICE_HINTS[name] is str else '== %s'
        root_device[name] = op % expression

    return root_device


def parse_root_device_hints(root_device):
    """Parse the root_device property of a node.

    Parses and validates the root_device property of a node. These are
    hints for how a node's root device is created. The 'size' hint
    should be a positive integer. The 'rotational' hint should be a
    Boolean value.

    :param root_device: the root_device dictionary from the node's property.
    :returns: a dictionary with the root device hints parsed or
              None if there are no hints.
    :raises: ValueError, if some information is invalid.

    """
    if not root_device:
        return

    root_device = copy.deepcopy(root_device)

    invalid_hints = set(root_device) - set(VALID_ROOT_DEVICE_HINTS)
    if invalid_hints:
        raise ValueError('The hints "%(invalid_hints)s" are invalid. '
                         'Valid hints are: "%(valid_hints)s"' %
                         {'invalid_hints': ', '.join(invalid_hints),
                          'valid_hints': ', '.join(VALID_ROOT_DEVICE_HINTS)})

    for name, expression in root_device.items():
        hint_type = VALID_ROOT_DEVICE_HINTS[name]
        if hint_type is str:
            if not isinstance(expression, str):
                raise ValueError(
                    'Root device hint "%(name)s" is not a string value. '
                    'Hint expression: %(expression)s' %
                    {'name': name, 'expression': expression})
            root_device[name] = _normalize_hint_expression(expression, name)

        elif hint_type is int:
            for v in _extract_hint_operator_and_values(expression,
                                                       name)['values']:
                try:
                    integer = int(v)
                except ValueError:
                    raise ValueError(
                        'Root device hint "%(name)s" is not an integer '
                        'value. Current value: %(expression)s' %
                        {'name': name, 'expression': expression})

                if integer <= 0:
                    raise ValueError(
                        'Root device hint "%(name)s" should be a positive '
                        'integer. Current value: %(expression)s' %
                        {'name': name, 'expression': expression})

        elif hint_type is bool:
            try:
                root_device[name] = strutils.bool_from_string(
                    expression, strict=True)
            except ValueError:
                raise ValueError(
                    'Root device hint "%(name)s" is not a Boolean value. '
                    'Current value: %(expression)s' %
                    {'name': name, 'expression': expression})

    return _append_operator_to_hints(root_device)


def find_devices_by_hints(devices, root_device_hints):
    """Find all devices that match the root device hints.

    Try to find devices that match the root device hints. In order
    for a device to be matched it needs to satisfy all the given hints.

    :param devices: A list of dictionaries representing the devices
                    containing one or more of the following keys:

        :name: (String) The device name, e.g /dev/sda
        :size: (Integer) Size of the device in *bytes*
        :model: (String) Device model
        :vendor: (String) Device vendor name
        :serial: (String) Device serial number
        :wwn: (String) Unique storage identifier
        :wwn_with_extension: (String): Unique storage identifier with
                             the vendor extension appended
        :wwn_vendor_extension: (String): United vendor storage identifier
        :rotational: (Boolean) Whether it's a rotational device or
                     not. Useful to distinguish HDDs (rotational) and SSDs
                     (not rotational).
        :hctl: (String): The SCSI address: Host, channel, target and lun.
                         For example: '1:0:0:0'.
        :by_path: (String): The alternative device name,
                  e.g. /dev/disk/by-path/pci-0000:00

    :param root_device_hints: A dictionary with the root device hints.
    :raises: ValueError, if some information is invalid.
    :returns: A generator with all matching devices as dictionaries.
    """
    LOG.debug('Trying to find devices from "%(devs)s" that match the '
              'device hints "%(hints)s"',
              {'devs': ', '.join([d.get('name') for d in devices]),
               'hints': root_device_hints})
    parsed_hints = parse_root_device_hints(root_device_hints)
    for dev in devices:
        device_name = dev.get('name')

        for hint in parsed_hints:
            hint_type = VALID_ROOT_DEVICE_HINTS[hint]
            device_value = dev.get(hint)
            hint_value = parsed_hints[hint]

            if hint_type is str:
                try:
                    device_value = _normalize_hint_expression(device_value,
                                                              hint)
                except ValueError:
                    LOG.warning(
                        'The attribute "%(attr)s" of the device "%(dev)s" '
                        'has an empty value. Skipping device.',
                        {'attr': hint, 'dev': device_name})
                    break

            if hint == 'size':
                # Since we don't support units yet we expect the size
                # in GiB for now
                device_value = device_value / units.Gi

            LOG.debug('Trying to match the device hint "%(hint)s" '
                      'with a value of "%(hint_value)s" against the same '
                      'device\'s (%(dev)s) attribute with a value of '
                      '"%(dev_value)s"', {'hint': hint, 'dev': device_name,
                                          'hint_value': hint_value,
                                          'dev_value': device_value})

            # NOTE(lucasagomes): Boolean hints are not supported by
            # specs_matcher.match(), so we need to do the comparison
            # ourselves
            if hint_type is bool:
                try:
                    device_value = strutils.bool_from_string(device_value,
                                                             strict=True)
                except ValueError:
                    LOG.warning('The attribute "%(attr)s" (with value '
                                '"%(value)s") of device "%(dev)s" is not '
                                'a valid Boolean. Skipping device.',
                                {'attr': hint, 'value': device_value,
                                 'dev': device_name})
                    break
                if device_value == hint_value:
                    continue

            elif specs_matcher.match(device_value, hint_value):
                continue

            LOG.debug('The attribute "%(attr)s" (with value "%(value)s") '
                      'of device "%(dev)s" does not match the hint %(hint)s',
                      {'attr': hint, 'value': device_value,
                       'dev': device_name, 'hint': hint_value})
            break
        else:
            yield dev


def match_root_device_hints(devices, root_device_hints):
    """Try to find a device that matches the root device hints.

    Try to find a device that matches the root device hints. In order
    for a device to be matched it needs to satisfy all the given hints.

    :param devices: A list of dictionaries representing the devices
                    containing one or more of the following keys:

        :name: (String) The device name, e.g /dev/sda
        :size: (Integer) Size of the device in *bytes*
        :model: (String) Device model
        :vendor: (String) Device vendor name
        :serial: (String) Device serial number
        :wwn: (String) Unique storage identifier
        :wwn_with_extension: (String): Unique storage identifier with
                             the vendor extension appended
        :wwn_vendor_extension: (String): United vendor storage identifier
        :rotational: (Boolean) Whether it's a rotational device or
                     not. Useful to distinguish HDDs (rotational) and SSDs
                     (not rotational).
        :hctl: (String): The SCSI address: Host, channel, target and lun.
                         For example: '1:0:0:0'.
        :by_path: (String): The alternative device name,
                  e.g. /dev/disk/by-path/pci-0000:00

    :param root_device_hints: A dictionary with the root device hints.
    :raises: ValueError, if some information is invalid.
    :returns: The first device to match all the hints or None.
    """
    try:
        dev = next(find_devices_by_hints(devices, root_device_hints))
    except StopIteration:
        LOG.warning('No device found that matches the root device hints %s',
                    root_device_hints)
    else:
        LOG.info('Root device found! The device "%s" matches the root '
                 'device hints %s', dev, root_device_hints)
        return dev