File: utils.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (185 lines) | stat: -rw-r--r-- 5,638 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import gzip
import hashlib
import json
import pkgutil
from collections.abc import Iterator, MutableMapping
from typing import Any, Optional, TypeVar
from uuid import UUID

from requests.structures import CaseInsensitiveDict as _CaseInsensitiveDict

DEFAULT_PARTITION = "aws"
REGION_PREFIX_TO_PARTITION = {
    # (region prefix, aws partition)
    "cn-": "aws-cn",
    "us-gov-": "aws-us-gov",
    "us-iso-": "aws-iso",
    "us-isob-": "aws-iso-b",
}
PARTITION_NAMES = list(REGION_PREFIX_TO_PARTITION.values()) + [DEFAULT_PARTITION]
ARN_PARTITION_REGEX = r"^arn:(" + "|".join(sorted(PARTITION_NAMES)) + ")"


def get_partition(region: str) -> str:
    if not region:
        return DEFAULT_PARTITION
    if region in PARTITION_NAMES:
        return region
    for prefix in REGION_PREFIX_TO_PARTITION:
        if region.startswith(prefix):
            return REGION_PREFIX_TO_PARTITION[prefix]
    return DEFAULT_PARTITION


def str2bool(v: Any) -> Optional[bool]:
    if v in ("yes", True, "true", "True", "TRUE", "t", "1"):
        return True
    elif v in ("no", False, "false", "False", "FALSE", "f", "0"):
        return False
    return None


def load_resource(package: str, resource: str) -> Any:
    """
    Open a file, and return the contents as JSON.
    Will try to load a compressed version (resources/file.json.gz) first, if it exists.
    Usage:
    load_resource(__name__, "resources/file.json")
    """
    try:
        compressed_resource = resource
        if not resource.endswith(".gz"):
            compressed_resource = f"{resource}.gz"
        data = gzip.decompress(pkgutil.get_data(package, compressed_resource))  # type: ignore
    except FileNotFoundError:
        data = pkgutil.get_data(package, resource)  # type: ignore
    return json.loads(data)


def load_resource_as_str(package: str, resource: str) -> str:
    return load_resource_as_bytes(package, resource).decode("utf-8")


def load_resource_as_bytes(package: str, resource: str) -> bytes:
    return pkgutil.get_data(package, resource)  # type: ignore


def merge_multiple_dicts(*args: Any) -> dict[str, Any]:
    result = {}
    for d in args:
        result.update(d)
    return result


def is_valid_uuid(uuid: str, version: int = 4) -> bool:
    try:
        UUID(uuid, version=version)
        return True
    except ValueError:
        return False


RESOURCE_TYPE = TypeVar("RESOURCE_TYPE")


def filter_resources(
    resources: list[RESOURCE_TYPE],
    filters: Any,
    attr_pairs: tuple[tuple[str, ...], ...],
) -> list[RESOURCE_TYPE]:
    """
    Used to filter resources. Usually in get and describe apis.
    """
    result = resources.copy()
    for resource in resources:
        for attrs in attr_pairs:
            values = filters.get(attrs[0]) or None
            if values:
                instance = getattr(resource, attrs[1])
                if (len(attrs) <= 2 and instance not in values) or (
                    len(attrs) == 3 and instance.get(attrs[2]) not in values
                ):
                    result.remove(resource)
                    break
            elif attrs[0] in filters:
                # In case the filter exists but the value of the filter is empty, the filter shouldn't match
                result.remove(resource)
                break
    return result


def md5_hash(data: Any = None) -> Any:
    """
    MD5-hashing for non-security usecases.
    Required for Moto to work in FIPS-enabled systems
    """
    args = (data,) if data else ()
    return hashlib.md5(*args, usedforsecurity=False)  # type: ignore


class LowercaseDict(MutableMapping[str, Any]):
    """A dictionary that lowercases all keys"""

    def __init__(self, *args: Any, **kwargs: Any):
        self.store: dict[str, Any] = {}
        self.update(dict(*args, **kwargs))  # use the free update to set keys

    def __getitem__(self, key: str) -> Any:
        return self.store[self._keytransform(key)]

    def __setitem__(self, key: str, value: Any) -> None:
        self.store[self._keytransform(key)] = value

    def __delitem__(self, key: str) -> None:
        del self.store[self._keytransform(key)]

    def __iter__(self) -> Iterator[Any]:
        return iter(self.store)

    def __len__(self) -> int:
        return len(self.store)

    def __repr__(self) -> str:
        return str(self.store)

    def _keytransform(self, key: str) -> str:
        return key.lower()


class CamelToUnderscoresWalker:
    """A class to convert the keys in dict/list hierarchical data structures from CamelCase to snake_case (underscores)"""

    @staticmethod
    def parse(x: Any) -> Any:  # type: ignore[misc]
        if isinstance(x, dict):
            return CamelToUnderscoresWalker.parse_dict(x)
        elif isinstance(x, list):
            return CamelToUnderscoresWalker.parse_list(x)
        else:
            return CamelToUnderscoresWalker.parse_scalar(x)

    @staticmethod
    def parse_dict(x: dict[str, Any]) -> dict[str, Any]:  # type: ignore[misc]
        from moto.core.utils import camelcase_to_underscores

        temp = {}
        for key in x.keys():
            temp[camelcase_to_underscores(key)] = CamelToUnderscoresWalker.parse(x[key])
        return temp

    @staticmethod
    def parse_list(x: Any) -> Any:  # type: ignore[misc]
        temp = []
        for i in x:
            temp.append(CamelToUnderscoresWalker.parse(i))
        return temp

    @staticmethod
    def parse_scalar(x: Any) -> Any:  # type: ignore[misc]
        return x


class CaseInsensitiveDict(_CaseInsensitiveDict):  # type: ignore[type-arg]
    """Proxy for requests.structures.CaseInsensitiveDict"""

    pass