File: utils.py

package info (click to toggle)
python-openstacksdk 4.4.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 13,352 kB
  • sloc: python: 122,960; sh: 153; makefile: 23
file content (679 lines) | stat: -rw-r--r-- 22,686 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import collections.abc
import hashlib
import io
import queue
import string
import threading
import time
import typing as ty

import keystoneauth1
from keystoneauth1 import adapter as ks_adapter
from keystoneauth1 import discover

from openstack import _log
from openstack import exceptions


def urljoin(*args: ty.Optional[str]) -> str:
    """A custom version of urljoin that simply joins strings into a path.

    The real urljoin takes into account web semantics like when joining a url
    like /path this should be joined to http://host/path as it is an anchored
    link. We generally won't care about that in client.
    """
    return '/'.join(str(a or '').strip('/') for a in args)


def iterate_timeout(
    timeout: ty.Optional[int],
    message: str,
    wait: ty.Union[int, float, None] = 2,
) -> ty.Generator[int, None, None]:
    """Iterate and raise an exception on timeout.

    This is a generator that will continually yield and sleep for
    wait seconds, and if the timeout is reached, will raise an exception
    with <message>.

    :param timeout: Maximum number of seconds to wait for transition. Set to
        ``None`` to wait forever.
    :param message: The message to use for the exception if the timeout is
        reached.
    :param wait: Number of seconds to wait between checks. Set to ``None``
        to use the default interval.

    :returns: None
    :raises: :class:`~openstack.exceptions.ResourceTimeout` transition
    :raises: :class:`~openstack.exceptions.SDKException` if ``wait`` is not a
        valid float, integer or None.
    """
    log = _log.setup_logging('openstack.iterate_timeout')

    try:
        # None as a wait winds up flowing well in the per-resource cache
        # flow. We could spread this logic around to all of the calling
        # points, but just having this treat None as "I don't have a value"
        # seems friendlier
        if wait is None:
            wait = 2
        elif wait == 0:
            # wait should be < timeout, unless timeout is None
            wait = 0.1 if timeout is None else min(0.1, timeout)
        wait = float(wait)
    except ValueError:
        raise exceptions.SDKException(
            f"Wait value must be an int or float value. {wait!r} given instead"
        )

    start = time.time()
    count = 0
    while (timeout is None) or (time.time() < start + timeout):
        count += 1
        yield count
        log.debug('Waiting %s seconds', wait)
        time.sleep(wait)
    raise exceptions.ResourceTimeout(message)


class _AccessSaver:
    __slots__ = ('keys',)

    def __init__(self) -> None:
        self.keys: list[str] = []

    def __getitem__(self, key: str) -> None:
        self.keys.append(key)


def get_string_format_keys(
    fmt_string: str, old_style: bool = True
) -> list[str]:
    """Gets a list of required keys from a format string

    Required mostly for parsing base_path urls for required keys, which
    use the old style string formatting.
    """
    if old_style:
        a = _AccessSaver()
        fmt_string % a

        return a.keys
    else:
        keys = []
        for t in string.Formatter().parse(fmt_string):
            if t[1] is not None:
                keys.append(t[1])
        return keys


def supports_version(
    adapter: ks_adapter.Adapter,
    version: str,
    raise_exception: bool = False,
) -> bool:
    """Determine if the given adapter supports the given version.

    Checks the version asserted by the service and ensures this matches the
    provided version. ``version`` can be a major version or a major-minor
    version

    :param adapter: :class:`~keystoneauth1.adapter.Adapter` instance.
    :param version: String containing the desired version.
    :param raise_exception: Raise exception when requested version
        is not supported by the server.
    :returns: ``True`` if the service supports the version, else ``False``.
    :raises: :class:`~openstack.exceptions.SDKException` when
        ``raise_exception`` is ``True`` and requested version is not supported.
    """

    def _supports_version() -> bool:
        required = discover.normalize_version_number(version)
        major_version = adapter.get_api_major_version()

        if not major_version:
            return False

        if not discover.version_match(required, major_version):
            return False

        return True

    supported = _supports_version()

    if not supported and raise_exception:
        raise exceptions.SDKException(
            f'Required version {version} is not supported by the server'
        )

    return supported


def supports_microversion(
    adapter: ks_adapter.Adapter,
    microversion: ty.Union[
        str, int, float, ty.Iterable[ty.Union[str, int, float]]
    ],
    raise_exception: bool = False,
) -> bool:
    """Determine if the given adapter supports the given microversion.

    Checks the min and max microversion asserted by the service and ensures
    ``min <= microversion <= max``. If set, the current default microversion is
    taken into consideration to ensure ``microversion <= default``.

    :param adapter: :class:`~keystoneauth1.adapter.Adapter` instance.
    :param microversion: String containing the desired microversion.
    :param raise_exception: Raise exception when requested microversion
        is not supported by the server or is higher than the current default
        microversion.
    :returns: True if the service supports the microversion, else False.
    :raises: :class:`~openstack.exceptions.SDKException` when
        ``raise_exception`` is ``True`` and requested microversion is not
        supported.
    """
    endpoint_data = adapter.get_endpoint_data()
    if endpoint_data is None:
        if raise_exception:
            raise exceptions.SDKException('Could not retrieve endpoint data')
        return False

    if (
        endpoint_data.min_microversion
        and endpoint_data.max_microversion
        and discover.version_between(
            endpoint_data.min_microversion,
            endpoint_data.max_microversion,
            microversion,
        )
    ):
        if adapter.default_microversion is not None:
            # If default_microversion is set - evaluate
            # whether it match the expectation
            candidate = discover.normalize_version_number(
                adapter.default_microversion
            )
            required = discover.normalize_version_number(microversion)
            supports = discover.version_match(required, candidate)
            if raise_exception and not supports:
                raise exceptions.SDKException(
                    f'Required microversion {microversion} is higher than '
                    f'currently selected {adapter.default_microversion}'
                )
            return supports  # type: ignore[no-any-return]

        return True

    if raise_exception:
        raise exceptions.SDKException(
            f'Required microversion {microversion} is not supported '
            f'by the server side'
        )

    return False


def require_microversion(adapter: ks_adapter.Adapter, required: str) -> None:
    """Require microversion.

    :param adapter: :class:`~keystoneauth1.adapter.Adapter` instance.
    :param str microversion: String containing the desired microversion.
    :raises: :class:`~openstack.exceptions.SDKException` when requested
        microversion is not supported
    """
    supports_microversion(adapter, required, raise_exception=True)


def pick_microversion(
    session: ks_adapter.Adapter, required: str
) -> ty.Optional[str]:
    """Get a new microversion if it is higher than session's default.

    :param session: The session to use for making this request.
    :param required: Minimum version that is required for an action.
    :return: ``required`` as a string if the ``session``'s default is too low,
        otherwise the ``session``'s default. Returns ``None`` if both
        are ``None``.
    :raises: TypeError if ``required`` is invalid.
    :raises: :class:`~openstack.exceptions.SDKException` if requested
        microversion is not supported.
    """
    required_normalized = None
    if required is not None:
        required_normalized = discover.normalize_version_number(required)

    if session.default_microversion is not None:
        default = discover.normalize_version_number(
            session.default_microversion
        )

        if required_normalized is None:
            required_normalized = default
        else:
            required_normalized = (
                default
                if discover.version_match(required_normalized, default)
                else required_normalized
            )

    if required_normalized is None:
        return None

    if not supports_microversion(session, required_normalized):
        raise exceptions.SDKException(
            'Requested microversion is not supported by the server side '
            'or the default microversion is too low'
        )
    return discover.version_to_string(required_normalized)  # type: ignore[no-any-return]


def maximum_supported_microversion(
    adapter: ks_adapter.Adapter,
    client_maximum: ty.Optional[str],
) -> ty.Optional[str]:
    """Determine the maximum microversion supported by both client and server.

    :param adapter: :class:`~keystoneauth1.adapter.Adapter` instance.
    :param client_maximum: Maximum microversion supported by the client.
        If ``None``, ``None`` is returned.

    :returns: the maximum supported microversion as string or ``None``.
    """
    if client_maximum is None:
        return None

    # NOTE(dtantsur): if we cannot determine supported microversions, fall back
    # to the default one.
    try:
        endpoint_data = adapter.get_endpoint_data()
    except keystoneauth1.exceptions.discovery.DiscoveryFailure:
        endpoint_data = None

    if endpoint_data is None:
        log = _log.setup_logging('openstack')
        log.warning(
            'Cannot determine endpoint data for service %s',
            adapter.service_type or adapter.service_name,
        )
        return None

    if not endpoint_data.max_microversion:
        return None

    client_max = discover.normalize_version_number(client_maximum)
    server_max = discover.normalize_version_number(
        endpoint_data.max_microversion
    )

    if endpoint_data.min_microversion:
        server_min = discover.normalize_version_number(
            endpoint_data.min_microversion
        )
        if client_max < server_min:
            # NOTE(dtantsur): we may want to raise in this case, but this keeps
            # the current behavior intact.
            return None

    result = min(client_max, server_max)
    return discover.version_to_string(result)  # type: ignore[no-any-return]


def _hashes_up_to_date(
    md5: ty.Optional[str],
    sha256: ty.Optional[str],
    md5_key: str,
    sha256_key: str,
) -> bool:
    """Compare md5 and sha256 hashes for being up to date

    md5 and sha256 are the current values.
    md5_key and sha256_key are the previous values.
    """
    up_to_date = False
    if md5 and md5_key == md5:
        up_to_date = True
    if sha256 and sha256_key == sha256:
        up_to_date = True
    if md5 and md5_key != md5:
        up_to_date = False
    if sha256 and sha256_key != sha256:
        up_to_date = False
    return up_to_date


def _calculate_data_hashes(
    data: ty.Union[io.BufferedReader, bytes],
) -> tuple[str, str]:
    _md5 = hashlib.md5(usedforsecurity=False)
    _sha256 = hashlib.sha256()

    if isinstance(data, io.BufferedIOBase):
        for chunk in iter(lambda: data.read(8192), b''):
            _md5.update(chunk)
            _sha256.update(chunk)
    elif isinstance(data, bytes):
        _md5.update(data)
        _sha256.update(data)
    else:
        raise TypeError(
            'unsupported type for data; expected IO stream or bytes; got '
            '{type(data)}'
        )

    return _md5.hexdigest(), _sha256.hexdigest()


def _get_file_hashes(filename: str) -> tuple[str, str]:
    _md5, _sha256 = (None, None)
    with open(filename, 'rb') as file_obj:
        _md5, _sha256 = _calculate_data_hashes(file_obj)

    return _md5, _sha256


class TinyDAG:
    """Tiny DAG

    Bases on the Kahn's algorithm, and enables parallel visiting of the nodes
    (parallel execution of the workflow items).
    """

    def __init__(self) -> None:
        self._reset()
        self._lock = threading.Lock()

    def _reset(self) -> None:
        self._graph: dict[str, set[str]] = {}
        self._wait_timeout = 120

    @property
    def graph(self) -> dict[str, set[str]]:
        """Get graph as adjacency dict"""
        return self._graph

    def add_node(self, node: str) -> None:
        self._graph.setdefault(node, set())

    def add_edge(self, u: str, v: str) -> None:
        self._graph[u].add(v)

    def walk(self, timeout: ty.Optional[int] = None) -> 'TinyDAG':
        """Start the walking from the beginning."""
        if timeout:
            self._wait_timeout = timeout
        return self

    def __iter__(self) -> 'TinyDAG':
        self._start_traverse()
        return self

    def __next__(self) -> str:
        # Start waiting if it is expected to get something
        # (counting down from graph length to 0).
        if self._it_cnt > 0:
            self._it_cnt -= 1
            try:
                res = self._queue.get(block=True, timeout=self._wait_timeout)
                return res

            except queue.Empty:
                raise exceptions.SDKException(
                    'Timeout waiting for cleanup task to complete'
                )
        else:
            raise StopIteration

    def node_done(self, node: str) -> None:
        """Mark node as "processed" and put following items into the queue"""
        self._done.add(node)

        for v in self._graph[node]:
            self._run_in_degree[v] -= 1
            if self._run_in_degree[v] == 0:
                self._queue.put(v)

    def _start_traverse(self) -> None:
        """Initialize graph traversing"""
        self._run_in_degree = self._get_in_degree()
        self._queue: queue.Queue[str] = queue.Queue()
        self._done: set[str] = set()
        self._it_cnt = len(self._graph)

        for k, v in self._run_in_degree.items():
            if v == 0:
                self._queue.put(k)

    def _get_in_degree(self) -> dict[str, int]:
        """Calculate the in_degree (count incoming) for nodes"""
        _in_degree: dict[str, int] = {u: 0 for u in self._graph.keys()}
        for u in self._graph:
            for v in self._graph[u]:
                _in_degree[v] += 1

        return _in_degree

    def topological_sort(self) -> list[str]:
        """Return the graph nodes in the topological order"""
        result = []
        for node in self:
            result.append(node)
            self.node_done(node)

        return result

    def size(self) -> int:
        return len(self._graph.keys())

    def is_complete(self) -> bool:
        return len(self._done) == self.size()


# Importing Munch is a relatively expensive operation (0.3s) while we do not
# really even need much of it. Before we can rework all places where we rely on
# it we can have a reduced version.
class Munch(dict[str, ty.Any]):
    """A slightly stripped version of munch.Munch class"""

    def __init__(self, *args: ty.Any, **kwargs: ty.Any):
        self.update(*args, **kwargs)

    # only called if k not found in normal places
    def __getattr__(self, k: str) -> ty.Any:
        """Gets key if it exists, otherwise throws AttributeError."""
        try:
            return object.__getattribute__(self, k)
        except AttributeError:
            try:
                return self[k]
            except KeyError:
                raise AttributeError(k)

    def __setattr__(self, k: str, v: ty.Any) -> None:
        """Sets attribute k if it exists, otherwise sets key k. A KeyError
        raised by set-item (only likely if you subclass Munch) will
        propagate as an AttributeError instead.
        """
        try:
            # Throws exception if not in prototype chain
            object.__getattribute__(self, k)
        except AttributeError:
            try:
                self[k] = v
            except Exception:
                raise AttributeError(k)
        else:
            object.__setattr__(self, k, v)

    def __delattr__(self, k: str) -> None:
        """Deletes attribute k if it exists, otherwise deletes key k.

        A KeyError raised by deleting the key - such as when the key is missing
        - will propagate as an AttributeError instead.
        """
        try:
            # Throws exception if not in prototype chain
            object.__getattribute__(self, k)
        except AttributeError:
            try:
                del self[k]
            except KeyError:
                raise AttributeError(k)
        else:
            object.__delattr__(self, k)

    def toDict(self) -> dict[str, ty.Any]:
        """Recursively converts a munch back into a dictionary."""
        return unmunchify(self)

    @property
    def __dict__(self) -> dict[str, ty.Any]:  # type: ignore[override]
        return self.toDict()

    def __repr__(self) -> str:
        """Invertible* string-form of a Munch."""
        return f'{self.__class__.__name__}({dict.__repr__(self)})'

    def __dir__(self) -> list[str]:
        return list(self.keys())

    def __getstate__(self) -> dict[str, ty.Any]:
        """Implement a serializable interface used for pickling.
        See https://docs.python.org/3.6/library/pickle.html.
        """
        return {k: v for k, v in self.items()}

    def __setstate__(self, state: dict[str, ty.Any]) -> None:
        """Implement a serializable interface used for pickling.
        See https://docs.python.org/3.6/library/pickle.html.
        """
        self.clear()
        self.update(state)

    # TODO(stephenfin): This needs to be stricter in the types that it will
    # accept. By limiting it to the primitive types (or subclasses of same) we
    # should cover everything we (sdk) care about and will be able to type the
    # results.
    @classmethod
    def fromDict(cls, d: dict[str, ty.Any]) -> 'Munch':
        """Recursively transforms a dictionary into a Munch via copy."""
        # Munchify x, using `seen` to track object cycles
        seen: dict[int, ty.Any] = dict()

        def munchify_cycles(obj: ty.Any) -> ty.Any:
            try:
                return seen[id(obj)]
            except KeyError:
                pass

            seen[id(obj)] = partial = pre_munchify(obj)
            return post_munchify(partial, obj)

        def pre_munchify(obj: ty.Any) -> ty.Any:
            if isinstance(obj, collections.abc.Mapping):
                return cls({})
            elif isinstance(obj, list):
                return type(obj)()
            elif isinstance(obj, tuple):
                type_factory = getattr(obj, "_make", type(obj))
                return type_factory(munchify_cycles(item) for item in obj)
            else:
                return obj

        def post_munchify(partial: ty.Any, obj: ty.Any) -> ty.Any:
            if isinstance(obj, collections.abc.Mapping):
                partial.update(
                    (k, munchify_cycles(obj[k])) for k in obj.keys()
                )
            elif isinstance(obj, list):
                partial.extend(munchify_cycles(item) for item in obj)
            elif isinstance(obj, tuple):
                for item_partial, item in zip(partial, obj):
                    post_munchify(item_partial, item)

            return partial

        return ty.cast('Munch', munchify_cycles(d))

    def copy(self) -> 'Munch':
        return self.fromDict(self)

    def update(self, *args: ty.Any, **kwargs: ty.Any) -> None:
        """
        Override built-in method to call custom __setitem__ method that may
        be defined in subclasses.
        """
        for k, v in dict(*args, **kwargs).items():
            self[k] = v

    def get(self, k: str, d: ty.Any = None) -> ty.Any:
        """
        D.get(k[,d]) -> D[k] if k in D, else d.  d defaults to None.
        """
        if k not in self:
            return d
        return self[k]

    def setdefault(self, k: str, d: ty.Any = None) -> ty.Any:
        """
        D.setdefault(k[,d]) -> D.get(k,d), also set D[k]=d if k not in D
        """
        if k not in self:
            self[k] = d
        return self[k]


def munchify(x: dict[str, ty.Any], factory: type[Munch] = Munch) -> Munch:
    """Recursively transforms a dictionary into a Munch via copy."""
    return Munch.fromDict(x)


def unmunchify(x: Munch) -> dict[str, ty.Any]:
    """Recursively converts a Munch into a dictionary."""

    # Munchify x, using `seen` to track object cycles
    seen: dict[int, ty.Any] = dict()

    def unmunchify_cycles(obj: ty.Any) -> ty.Any:
        try:
            return seen[id(obj)]
        except KeyError:
            pass

        seen[id(obj)] = partial = pre_unmunchify(obj)
        return post_unmunchify(partial, obj)

    def pre_unmunchify(obj: ty.Any) -> ty.Any:
        if isinstance(obj, collections.abc.Mapping):
            return dict()
        elif isinstance(obj, list):
            return type(obj)()
        elif isinstance(obj, tuple):
            type_factory = getattr(obj, "_make", type(obj))
            return type_factory(unmunchify_cycles(item) for item in obj)
        else:
            return obj

    def post_unmunchify(partial: ty.Any, obj: ty.Any) -> ty.Any:
        if isinstance(obj, collections.abc.Mapping):
            partial.update((k, unmunchify_cycles(obj[k])) for k in obj.keys())
        elif isinstance(obj, list):
            partial.extend(unmunchify_cycles(v) for v in obj)
        elif isinstance(obj, tuple):
            for value_partial, value in zip(partial, obj):
                post_unmunchify(value_partial, value)

        return partial

    return ty.cast(dict[str, ty.Any], unmunchify_cycles(x))