File: models.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (208 lines) | stat: -rw-r--r-- 8,096 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import datetime
import json
import re
from collections import OrderedDict
from typing import Any, Optional

from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import utcnow

from .exceptions import InvalidNameException, ResourceNotFoundError
from .utils import get_random_identity_id


class CognitoIdentityPool(BaseModel):
    def __init__(self, region: str, identity_pool_name: str, **kwargs: Any):
        self.identity_pool_name = identity_pool_name

        if not re.fullmatch(r"[\w\s+=,.@-]+", identity_pool_name):
            raise InvalidNameException(identity_pool_name)

        self.allow_unauthenticated_identities = kwargs.get(
            "allow_unauthenticated_identities", ""
        )
        self.supported_login_providers = kwargs.get("supported_login_providers", {})
        self.developer_provider_name = kwargs.get("developer_provider_name", "")
        self.open_id_connect_provider_arns = kwargs.get(
            "open_id_connect_provider_arns", []
        )
        self.cognito_identity_providers = kwargs.get("cognito_identity_providers", [])
        self.saml_provider_arns = kwargs.get("saml_provider_arns", [])

        self.identity_pool_id = get_random_identity_id(region)
        self.creation_time = utcnow()

        self.tags = kwargs.get("tags") or {}

    def to_json(self) -> str:
        return json.dumps(
            {
                "IdentityPoolId": self.identity_pool_id,
                "IdentityPoolName": self.identity_pool_name,
                "AllowUnauthenticatedIdentities": self.allow_unauthenticated_identities,
                "SupportedLoginProviders": self.supported_login_providers,
                "DeveloperProviderName": self.developer_provider_name,
                "OpenIdConnectProviderARNs": self.open_id_connect_provider_arns,
                "CognitoIdentityProviders": self.cognito_identity_providers,
                "SamlProviderARNs": self.saml_provider_arns,
                "IdentityPoolTags": self.tags,
            }
        )


class CognitoIdentityBackend(BaseBackend):
    def __init__(self, region_name: str, account_id: str):
        super().__init__(region_name, account_id)
        self.identity_pools: dict[str, CognitoIdentityPool] = OrderedDict()
        self.pools_identities: dict[str, dict[str, Any]] = {}

    def describe_identity_pool(self, identity_pool_id: str) -> str:
        identity_pool = self.identity_pools.get(identity_pool_id, None)

        if not identity_pool:
            raise ResourceNotFoundError(identity_pool_id)

        return identity_pool.to_json()

    def create_identity_pool(
        self,
        identity_pool_name: str,
        allow_unauthenticated_identities: bool,
        supported_login_providers: dict[str, str],
        developer_provider_name: str,
        open_id_connect_provider_arns: list[str],
        cognito_identity_providers: list[dict[str, Any]],
        saml_provider_arns: list[str],
        tags: dict[str, str],
    ) -> str:
        new_identity = CognitoIdentityPool(
            self.region_name,
            identity_pool_name,
            allow_unauthenticated_identities=allow_unauthenticated_identities,
            supported_login_providers=supported_login_providers,
            developer_provider_name=developer_provider_name,
            open_id_connect_provider_arns=open_id_connect_provider_arns,
            cognito_identity_providers=cognito_identity_providers,
            saml_provider_arns=saml_provider_arns,
            tags=tags,
        )
        self.identity_pools[new_identity.identity_pool_id] = new_identity
        self.pools_identities.update(
            {
                new_identity.identity_pool_id: {
                    "IdentityPoolId": new_identity.identity_pool_id,
                    "Identities": [],
                }
            }
        )
        return new_identity.to_json()

    def update_identity_pool(
        self,
        identity_pool_id: str,
        identity_pool_name: str,
        allow_unauthenticated: Optional[bool],
        login_providers: Optional[dict[str, str]],
        provider_name: Optional[str],
        provider_arns: Optional[list[str]],
        identity_providers: Optional[list[dict[str, Any]]],
        saml_providers: Optional[list[str]],
        tags: Optional[dict[str, str]],
    ) -> str:
        """
        The AllowClassic-parameter has not yet been implemented
        """
        pool = self.identity_pools[identity_pool_id]
        pool.identity_pool_name = pool.identity_pool_name or identity_pool_name
        if allow_unauthenticated is not None:
            pool.allow_unauthenticated_identities = allow_unauthenticated
        if login_providers is not None:
            pool.supported_login_providers = login_providers
        if provider_name:
            pool.developer_provider_name = provider_name
        if provider_arns is not None:
            pool.open_id_connect_provider_arns = provider_arns
        if identity_providers is not None:
            pool.cognito_identity_providers = identity_providers
        if saml_providers is not None:
            pool.saml_provider_arns = saml_providers
        if tags:
            pool.tags = tags

        return pool.to_json()

    def get_id(self, identity_pool_id: str) -> str:
        # This call does not have to be authenticated, which means we do not know to which region it was sent originally
        # But the identity_pool_id is always prefixed with the region, so we just that to determine the right region
        #
        # Note that this does mean that we lose a potential error scenario,
        # where the user requests an ID for identity pool `us-west-1:...` in us-west-2
        # But because we don't always know that the request was sent to us-west-2, there's nothing we can do
        #
        region = identity_pool_id.split(":")[0]
        backend: CognitoIdentityBackend = cognitoidentity_backends[self.account_id][
            region
        ]

        identity_id = {"IdentityId": get_random_identity_id(self.region_name)}
        backend.pools_identities[identity_pool_id]["Identities"].append(identity_id)
        return json.dumps(identity_id)

    def get_credentials_for_identity(self, identity_id: str) -> str:
        duration = 90
        now = utcnow()
        expiration = now + datetime.timedelta(seconds=duration)
        return json.dumps(
            {
                "Credentials": {
                    "AccessKeyId": "TESTACCESSKEY12345",
                    "Expiration": expiration.timestamp(),
                    "SecretKey": "ABCSECRETKEY",
                    "SessionToken": "ABC12345",
                },
                "IdentityId": identity_id,
            }
        )

    def get_open_id_token_for_developer_identity(self, identity_id: str) -> str:
        return json.dumps(
            {
                "IdentityId": identity_id,
                "Token": get_random_identity_id(self.region_name),
            }
        )

    def get_open_id_token(self, identity_id: str) -> str:
        return json.dumps(
            {
                "IdentityId": identity_id,
                "Token": get_random_identity_id(self.region_name),
            }
        )

    def list_identities(self, identity_pool_id: str) -> str:
        """
        The MaxResults-parameter has not yet been implemented
        """
        return json.dumps(self.pools_identities[identity_pool_id])

    def list_identity_pools(self) -> str:
        """
        The MaxResults-parameter has not yet been implemented
        """
        return json.dumps(
            {
                "IdentityPools": [
                    json.loads(pool.to_json()) for pool in self.identity_pools.values()
                ]
            }
        )

    def delete_identity_pool(self, identity_pool_id: str) -> None:
        self.describe_identity_pool(identity_pool_id)

        del self.identity_pools[identity_pool_id]


cognitoidentity_backends = BackendDict(CognitoIdentityBackend, "cognito-identity")