1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
|
import datetime
import json
import re
from collections import OrderedDict
from typing import Any, Optional
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import utcnow
from .exceptions import InvalidNameException, ResourceNotFoundError
from .utils import get_random_identity_id
class CognitoIdentityPool(BaseModel):
def __init__(self, region: str, identity_pool_name: str, **kwargs: Any):
self.identity_pool_name = identity_pool_name
if not re.fullmatch(r"[\w\s+=,.@-]+", identity_pool_name):
raise InvalidNameException(identity_pool_name)
self.allow_unauthenticated_identities = kwargs.get(
"allow_unauthenticated_identities", ""
)
self.supported_login_providers = kwargs.get("supported_login_providers", {})
self.developer_provider_name = kwargs.get("developer_provider_name", "")
self.open_id_connect_provider_arns = kwargs.get(
"open_id_connect_provider_arns", []
)
self.cognito_identity_providers = kwargs.get("cognito_identity_providers", [])
self.saml_provider_arns = kwargs.get("saml_provider_arns", [])
self.identity_pool_id = get_random_identity_id(region)
self.creation_time = utcnow()
self.tags = kwargs.get("tags") or {}
def to_json(self) -> str:
return json.dumps(
{
"IdentityPoolId": self.identity_pool_id,
"IdentityPoolName": self.identity_pool_name,
"AllowUnauthenticatedIdentities": self.allow_unauthenticated_identities,
"SupportedLoginProviders": self.supported_login_providers,
"DeveloperProviderName": self.developer_provider_name,
"OpenIdConnectProviderARNs": self.open_id_connect_provider_arns,
"CognitoIdentityProviders": self.cognito_identity_providers,
"SamlProviderARNs": self.saml_provider_arns,
"IdentityPoolTags": self.tags,
}
)
class CognitoIdentityBackend(BaseBackend):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.identity_pools: dict[str, CognitoIdentityPool] = OrderedDict()
self.pools_identities: dict[str, dict[str, Any]] = {}
def describe_identity_pool(self, identity_pool_id: str) -> str:
identity_pool = self.identity_pools.get(identity_pool_id, None)
if not identity_pool:
raise ResourceNotFoundError(identity_pool_id)
return identity_pool.to_json()
def create_identity_pool(
self,
identity_pool_name: str,
allow_unauthenticated_identities: bool,
supported_login_providers: dict[str, str],
developer_provider_name: str,
open_id_connect_provider_arns: list[str],
cognito_identity_providers: list[dict[str, Any]],
saml_provider_arns: list[str],
tags: dict[str, str],
) -> str:
new_identity = CognitoIdentityPool(
self.region_name,
identity_pool_name,
allow_unauthenticated_identities=allow_unauthenticated_identities,
supported_login_providers=supported_login_providers,
developer_provider_name=developer_provider_name,
open_id_connect_provider_arns=open_id_connect_provider_arns,
cognito_identity_providers=cognito_identity_providers,
saml_provider_arns=saml_provider_arns,
tags=tags,
)
self.identity_pools[new_identity.identity_pool_id] = new_identity
self.pools_identities.update(
{
new_identity.identity_pool_id: {
"IdentityPoolId": new_identity.identity_pool_id,
"Identities": [],
}
}
)
return new_identity.to_json()
def update_identity_pool(
self,
identity_pool_id: str,
identity_pool_name: str,
allow_unauthenticated: Optional[bool],
login_providers: Optional[dict[str, str]],
provider_name: Optional[str],
provider_arns: Optional[list[str]],
identity_providers: Optional[list[dict[str, Any]]],
saml_providers: Optional[list[str]],
tags: Optional[dict[str, str]],
) -> str:
"""
The AllowClassic-parameter has not yet been implemented
"""
pool = self.identity_pools[identity_pool_id]
pool.identity_pool_name = pool.identity_pool_name or identity_pool_name
if allow_unauthenticated is not None:
pool.allow_unauthenticated_identities = allow_unauthenticated
if login_providers is not None:
pool.supported_login_providers = login_providers
if provider_name:
pool.developer_provider_name = provider_name
if provider_arns is not None:
pool.open_id_connect_provider_arns = provider_arns
if identity_providers is not None:
pool.cognito_identity_providers = identity_providers
if saml_providers is not None:
pool.saml_provider_arns = saml_providers
if tags:
pool.tags = tags
return pool.to_json()
def get_id(self, identity_pool_id: str) -> str:
# This call does not have to be authenticated, which means we do not know to which region it was sent originally
# But the identity_pool_id is always prefixed with the region, so we just that to determine the right region
#
# Note that this does mean that we lose a potential error scenario,
# where the user requests an ID for identity pool `us-west-1:...` in us-west-2
# But because we don't always know that the request was sent to us-west-2, there's nothing we can do
#
region = identity_pool_id.split(":")[0]
backend: CognitoIdentityBackend = cognitoidentity_backends[self.account_id][
region
]
identity_id = {"IdentityId": get_random_identity_id(self.region_name)}
backend.pools_identities[identity_pool_id]["Identities"].append(identity_id)
return json.dumps(identity_id)
def get_credentials_for_identity(self, identity_id: str) -> str:
duration = 90
now = utcnow()
expiration = now + datetime.timedelta(seconds=duration)
return json.dumps(
{
"Credentials": {
"AccessKeyId": "TESTACCESSKEY12345",
"Expiration": expiration.timestamp(),
"SecretKey": "ABCSECRETKEY",
"SessionToken": "ABC12345",
},
"IdentityId": identity_id,
}
)
def get_open_id_token_for_developer_identity(self, identity_id: str) -> str:
return json.dumps(
{
"IdentityId": identity_id,
"Token": get_random_identity_id(self.region_name),
}
)
def get_open_id_token(self, identity_id: str) -> str:
return json.dumps(
{
"IdentityId": identity_id,
"Token": get_random_identity_id(self.region_name),
}
)
def list_identities(self, identity_pool_id: str) -> str:
"""
The MaxResults-parameter has not yet been implemented
"""
return json.dumps(self.pools_identities[identity_pool_id])
def list_identity_pools(self) -> str:
"""
The MaxResults-parameter has not yet been implemented
"""
return json.dumps(
{
"IdentityPools": [
json.loads(pool.to_json()) for pool in self.identity_pools.values()
]
}
)
def delete_identity_pool(self, identity_pool_id: str) -> None:
self.describe_identity_pool(identity_pool_id)
del self.identity_pools[identity_pool_id]
cognitoidentity_backends = BackendDict(CognitoIdentityBackend, "cognito-identity")
|