File: models.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (618 lines) | stat: -rw-r--r-- 22,915 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
import json
from typing import Any, Optional

from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import unix_time
from moto.iam.aws_managed_policies import aws_managed_policies_data
from moto.moto_api._internal import mock_random as random
from moto.utilities.paginator import paginate
from moto.utilities.utils import get_partition

from .exceptions import (
    ConflictException,
    ResourceNotFoundException,
    ServiceQuotaExceededException,
)
from .utils import PAGINATION_MODEL

# https://docs.aws.amazon.com/singlesignon/latest/userguide/limits.html
MAX_MANAGED_POLICIES_PER_PERMISSION_SET = 20


class AccountAssignment(BaseModel):
    def __init__(
        self,
        instance_arn: str,
        target_id: str,
        target_type: str,
        permission_set_arn: str,
        principal_type: str,
        principal_id: str,
    ):
        self.request_id = str(random.uuid4())
        self.instance_arn = instance_arn
        self.target_id = target_id
        self.target_type = target_type
        self.permission_set_arn = permission_set_arn
        self.principal_type = principal_type
        self.principal_id = principal_id
        self.created_date = unix_time()

    def to_json(
        self, include_creation_date: bool = False, include_request_id: bool = False
    ) -> dict[str, Any]:
        summary: dict[str, Any] = {
            "TargetId": self.target_id,
            "TargetType": self.target_type,
            "PermissionSetArn": self.permission_set_arn,
            "PrincipalType": self.principal_type,
            "PrincipalId": self.principal_id,
        }
        if include_creation_date:
            summary["CreatedDate"] = self.created_date
        if include_request_id:
            summary["RequestId"] = self.request_id
        return summary


class PermissionSet(BaseModel):
    def __init__(
        self,
        name: str,
        description: str,
        instance_arn: str,
        session_duration: str,
        relay_state: str,
        tags: list[dict[str, str]],
    ):
        self.name = name
        self.description = description
        self.instance_arn = instance_arn
        self.permission_set_arn = PermissionSet.generate_id(instance_arn)
        self.session_duration = session_duration
        self.relay_state = relay_state
        self.tags = tags
        self.created_date = unix_time()
        self.inline_policy = ""
        self.managed_policies: list[ManagedPolicy] = []
        self.customer_managed_policies: list[CustomerManagedPolicy] = []
        self.total_managed_policies_attached = (
            0  # this will also include customer managed policies
        )

    def to_json(self, include_creation_date: bool = False) -> dict[str, Any]:
        summary: dict[str, Any] = {
            "Name": self.name,
            "Description": self.description,
            "PermissionSetArn": self.permission_set_arn,
            "SessionDuration": self.session_duration,
            "RelayState": self.relay_state,
        }
        if include_creation_date:
            summary["CreatedDate"] = self.created_date
        return summary

    @staticmethod
    def generate_id(instance_arn: str) -> str:
        return instance_arn + "/ps-" + random.get_random_string(length=16).lower()


class ManagedPolicy(BaseModel):
    def __init__(self, arn: str, name: str):
        self.arn = arn
        self.name = name

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, ManagedPolicy):
            return False
        return self.arn == other.arn


class CustomerManagedPolicy(BaseModel):
    def __init__(self, name: str, path: str = "/"):
        self.name = name
        self.path = path

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, CustomerManagedPolicy):
            return False
        return f"{self.path}{self.name}" == f"{other.path}{other.name}"


class Instance:
    def __init__(self, account_id: str, region: str):
        self.created_date = unix_time()
        self.identity_store_id = (
            f"d-{random.get_random_string(length=10, lower_case=True)}"
        )
        self.instance_arn = f"arn:{get_partition(region)}:sso:::instance/ssoins-{random.get_random_string(length=16, lower_case=True)}"
        self.account_id = account_id
        self.status = "ACTIVE"
        self.name: Optional[str] = None

        self.provisioned_permission_sets: list[PermissionSet] = []

    def to_json(self) -> dict[str, Any]:
        return {
            "CreatedDate": self.created_date,
            "IdentityStoreId": self.identity_store_id,
            "InstanceArn": self.instance_arn,
            "Name": self.name,
            "OwnerAccountId": self.account_id,
            "Status": self.status,
        }


class SSOAdminBackend(BaseBackend):
    """Implementation of SSOAdmin APIs."""

    def __init__(self, region_name: str, account_id: str):
        super().__init__(region_name, account_id)
        self.account_assignments: list[AccountAssignment] = []
        self.deleted_account_assignments: list[AccountAssignment] = []
        self.permission_sets: list[PermissionSet] = []
        self.aws_managed_policies: Optional[dict[str, Any]] = None
        self.instances: list[Instance] = []

        self.instances.append(Instance(self.account_id, self.region_name))

    def create_account_assignment(
        self,
        instance_arn: str,
        target_id: str,
        target_type: str,
        permission_set_arn: str,
        principal_type: str,
        principal_id: str,
    ) -> dict[str, Any]:
        assignment = AccountAssignment(
            instance_arn,
            target_id,
            target_type,
            permission_set_arn,
            principal_type,
            principal_id,
        )
        self.account_assignments.append(assignment)
        return assignment.to_json(include_creation_date=True, include_request_id=True)

    def delete_account_assignment(
        self,
        instance_arn: str,
        target_id: str,
        target_type: str,
        permission_set_arn: str,
        principal_type: str,
        principal_id: str,
    ) -> dict[str, Any]:
        account = self._find_account(
            instance_arn,
            target_id,
            target_type,
            permission_set_arn,
            principal_type,
            principal_id,
        )
        self.deleted_account_assignments.append(account)
        self.account_assignments.remove(account)
        return account.to_json(include_creation_date=True, include_request_id=True)

    def _find_account(
        self,
        instance_arn: str,
        target_id: str,
        target_type: str,
        permission_set_arn: str,
        principal_type: str,
        principal_id: str,
    ) -> AccountAssignment:
        for account in self.account_assignments:
            instance_arn_match = account.instance_arn == instance_arn
            target_id_match = account.target_id == target_id
            target_type_match = account.target_type == target_type
            permission_set_match = account.permission_set_arn == permission_set_arn
            principal_type_match = account.principal_type == principal_type
            principal_id_match = account.principal_id == principal_id
            if (
                instance_arn_match
                and target_id_match
                and target_type_match
                and permission_set_match
                and principal_type_match
                and principal_id_match
            ):
                return account
        raise ResourceNotFoundException

    def _find_managed_policy(self, managed_policy_arn: str) -> ManagedPolicy:
        """
        Checks to make sure the managed policy exists.
        This pulls from moto/iam/aws_managed_policies.py
        """
        # Lazy loading of aws managed policies file
        if self.aws_managed_policies is None:
            self.aws_managed_policies = json.loads(aws_managed_policies_data)

        policy_name = managed_policy_arn.split("/")[-1]
        managed_policy = self.aws_managed_policies.get(policy_name, None)
        if managed_policy is not None:
            path = managed_policy.get("path", "/")
            expected_arn = f"arn:{self.partition}:iam::aws:policy{path}{policy_name}"

            if managed_policy_arn == expected_arn:
                return ManagedPolicy(managed_policy_arn, policy_name)
        raise ResourceNotFoundException(
            f"Policy does not exist with ARN: {managed_policy_arn}"
        )

    @paginate(PAGINATION_MODEL)
    def list_account_assignments(
        self, instance_arn: str, account_id: str, permission_set_arn: str
    ) -> list[dict[str, str]]:
        account_assignments = []
        for assignment in self.account_assignments:
            if (
                assignment.instance_arn == instance_arn
                and assignment.target_id == account_id
                and assignment.permission_set_arn == permission_set_arn
            ):
                account_assignments.append(
                    {
                        "AccountId": assignment.target_id,
                        "PermissionSetArn": assignment.permission_set_arn,
                        "PrincipalType": assignment.principal_type,
                        "PrincipalId": assignment.principal_id,
                    }
                )
        return account_assignments

    @paginate(PAGINATION_MODEL)
    def list_account_assignments_for_principal(
        self,
        filter_: dict[str, Any],
        instance_arn: str,
        principal_id: str,
        principal_type: str,
    ) -> list[dict[str, str]]:
        return [
            {
                "AccountId": account_assignment.target_id,
                "PermissionSetArn": account_assignment.permission_set_arn,
                "PrincipalId": account_assignment.principal_id,
                "PrincipalType": account_assignment.principal_type,
            }
            for account_assignment in self.account_assignments
            if all(
                [
                    filter_.get("AccountId", account_assignment.target_id)
                    == account_assignment.target_id,
                    principal_id == account_assignment.principal_id,
                    principal_type == account_assignment.principal_type,
                    instance_arn == account_assignment.instance_arn,
                ]
            )
        ]

    def create_permission_set(
        self,
        name: str,
        description: str,
        instance_arn: str,
        session_duration: str,
        relay_state: str,
        tags: list[dict[str, str]],
    ) -> dict[str, Any]:
        permission_set = PermissionSet(
            name,
            description,
            instance_arn,
            session_duration,
            relay_state,
            tags,
        )
        self.permission_sets.append(permission_set)
        return permission_set.to_json(True)

    def update_permission_set(
        self,
        instance_arn: str,
        permission_set_arn: str,
        description: str,
        session_duration: str,
        relay_state: str,
    ) -> dict[str, Any]:
        permission_set = self._find_permission_set(
            instance_arn,
            permission_set_arn,
        )
        self.permission_sets.remove(permission_set)
        permission_set.description = description
        permission_set.session_duration = session_duration
        permission_set.relay_state = relay_state
        self.permission_sets.append(permission_set)
        return permission_set.to_json(True)

    def describe_permission_set(
        self, instance_arn: str, permission_set_arn: str
    ) -> dict[str, Any]:
        permission_set = self._find_permission_set(
            instance_arn,
            permission_set_arn,
        )
        return permission_set.to_json(True)

    def delete_permission_set(
        self, instance_arn: str, permission_set_arn: str
    ) -> dict[str, Any]:
        permission_set = self._find_permission_set(
            instance_arn,
            permission_set_arn,
        )
        self.permission_sets.remove(permission_set)

        for instance in self.instances:
            try:
                instance.provisioned_permission_sets.remove(permission_set)
            except ValueError:
                pass

        return permission_set.to_json(include_creation_date=True)

    def _find_permission_set(
        self, instance_arn: str, permission_set_arn: str
    ) -> PermissionSet:
        for permission_set in self.permission_sets:
            instance_arn_match = permission_set.instance_arn == instance_arn
            permission_set_match = (
                permission_set.permission_set_arn == permission_set_arn
            )
            if instance_arn_match and permission_set_match:
                return permission_set
        ps_id = permission_set_arn.split("/")[-1]
        raise ResourceNotFoundException(
            message=f"Could not find PermissionSet with id {ps_id}"
        )

    @paginate(pagination_model=PAGINATION_MODEL)
    def list_permission_sets(self, instance_arn: str) -> list[PermissionSet]:
        permission_sets = []
        for permission_set in self.permission_sets:
            if permission_set.instance_arn == instance_arn:
                permission_sets.append(permission_set)
        return permission_sets

    def put_inline_policy_to_permission_set(
        self, instance_arn: str, permission_set_arn: str, inline_policy: str
    ) -> None:
        permission_set = self._find_permission_set(
            instance_arn,
            permission_set_arn,
        )
        permission_set.inline_policy = inline_policy

    def get_inline_policy_for_permission_set(
        self, instance_arn: str, permission_set_arn: str
    ) -> str:
        permission_set = self._find_permission_set(
            instance_arn,
            permission_set_arn,
        )
        return permission_set.inline_policy

    def delete_inline_policy_from_permission_set(
        self, instance_arn: str, permission_set_arn: str
    ) -> None:
        permission_set = self._find_permission_set(
            instance_arn,
            permission_set_arn,
        )
        permission_set.inline_policy = ""

    def attach_managed_policy_to_permission_set(
        self, instance_arn: str, permission_set_arn: str, managed_policy_arn: str
    ) -> None:
        permissionset = self._find_permission_set(
            instance_arn,
            permission_set_arn,
        )
        managed_policy = self._find_managed_policy(managed_policy_arn)

        permissionset_id = permission_set_arn.split("/")[-1]
        if managed_policy in permissionset.managed_policies:
            raise ConflictException(
                f"Permission set with id {permissionset_id} already has a typed link attachment to a manged policy with {managed_policy_arn}"
            )

        if (
            permissionset.total_managed_policies_attached
            >= MAX_MANAGED_POLICIES_PER_PERMISSION_SET
        ):
            permissionset_id = permission_set_arn.split("/")[-1]
            raise ServiceQuotaExceededException(
                f"You have exceeded AWS SSO limits. Cannot create ManagedPolicy more than {MAX_MANAGED_POLICIES_PER_PERMISSION_SET} for id {permissionset_id}. Please refer to https://docs.aws.amazon.com/singlesignon/latest/userguide/limits.html"
            )

        permissionset.managed_policies.append(managed_policy)
        permissionset.total_managed_policies_attached += 1

    @paginate(pagination_model=PAGINATION_MODEL)
    def list_managed_policies_in_permission_set(
        self,
        instance_arn: str,
        permission_set_arn: str,
    ) -> list[ManagedPolicy]:
        permissionset = self._find_permission_set(
            instance_arn,
            permission_set_arn,
        )
        return permissionset.managed_policies

    def _detach_managed_policy(
        self, instance_arn: str, permission_set_arn: str, managed_policy_arn: str
    ) -> None:
        # ensure permission_set exists
        permissionset = self._find_permission_set(
            instance_arn,
            permission_set_arn,
        )

        for managed_policy in permissionset.managed_policies:
            if managed_policy.arn == managed_policy_arn:
                permissionset.managed_policies.remove(managed_policy)
                permissionset.total_managed_policies_attached -= 1
                return

        raise ResourceNotFoundException(
            f"Could not find ManagedPolicy with arn {managed_policy_arn}"
        )

    def detach_managed_policy_from_permission_set(
        self, instance_arn: str, permission_set_arn: str, managed_policy_arn: str
    ) -> None:
        self._detach_managed_policy(
            instance_arn, permission_set_arn, managed_policy_arn
        )

    def attach_customer_managed_policy_reference_to_permission_set(
        self,
        instance_arn: str,
        permission_set_arn: str,
        customer_managed_policy_reference: dict[str, str],
    ) -> None:
        permissionset = self._find_permission_set(
            permission_set_arn=permission_set_arn, instance_arn=instance_arn
        )

        name = customer_managed_policy_reference["Name"]
        path = customer_managed_policy_reference.get("Path", "/")  # default path is "/"
        customer_managed_policy = CustomerManagedPolicy(name=name, path=path)

        if customer_managed_policy in permissionset.customer_managed_policies:
            raise ConflictException(
                f"Given customer managed policy with name: {name}  and path {path} already attached"
            )

        if (
            permissionset.total_managed_policies_attached
            >= MAX_MANAGED_POLICIES_PER_PERMISSION_SET
        ):
            raise ServiceQuotaExceededException(
                f"Cannot attach managed policy: number of attached managed policies is already at maximum {MAX_MANAGED_POLICIES_PER_PERMISSION_SET}"
            )

        permissionset.customer_managed_policies.append(customer_managed_policy)
        permissionset.total_managed_policies_attached += 1

    @paginate(pagination_model=PAGINATION_MODEL)
    def list_customer_managed_policy_references_in_permission_set(
        self, instance_arn: str, permission_set_arn: str
    ) -> list[CustomerManagedPolicy]:
        permissionset = self._find_permission_set(
            permission_set_arn=permission_set_arn, instance_arn=instance_arn
        )
        return permissionset.customer_managed_policies

    def _detach_customer_managed_policy_from_permissionset(
        self,
        instance_arn: str,
        permission_set_arn: str,
        customer_managed_policy_reference: dict[str, str],
    ) -> None:
        permissionset = self._find_permission_set(
            permission_set_arn=permission_set_arn, instance_arn=instance_arn
        )
        path: str = customer_managed_policy_reference.get("Path", "/")
        name: str = customer_managed_policy_reference["Name"]

        for customer_managed_policy in permissionset.customer_managed_policies:
            if (
                customer_managed_policy.name == name
                and customer_managed_policy.path == path
            ):
                permissionset.customer_managed_policies.remove(customer_managed_policy)
                permissionset.total_managed_policies_attached -= 1
                return

        raise ResourceNotFoundException(
            f"Given managed policy with name: {name}  and path {path} does not exist on PermissionSet"
        )

    def detach_customer_managed_policy_reference_from_permission_set(
        self,
        instance_arn: str,
        permission_set_arn: str,
        customer_managed_policy_reference: dict[str, str],
    ) -> None:
        self._detach_customer_managed_policy_from_permissionset(
            instance_arn=instance_arn,
            permission_set_arn=permission_set_arn,
            customer_managed_policy_reference=customer_managed_policy_reference,
        )

    def describe_account_assignment_creation_status(
        self, account_assignment_creation_request_id: str, instance_arn: str
    ) -> dict[str, Any]:
        for account in self.account_assignments:
            if account.request_id == account_assignment_creation_request_id:
                return account.to_json(
                    include_creation_date=True, include_request_id=True
                )

        raise ResourceNotFoundException

    def describe_account_assignment_deletion_status(
        self, account_assignment_deletion_request_id: str, instance_arn: str
    ) -> dict[str, Any]:
        for account in self.deleted_account_assignments:
            if account.request_id == account_assignment_deletion_request_id:
                return account.to_json(
                    include_creation_date=True, include_request_id=True
                )

        raise ResourceNotFoundException

    def list_instances(self) -> list[Instance]:
        return self.instances

    def update_instance(self, instance_arn: str, name: str) -> None:
        for instance in self.instances:
            if instance.instance_arn == instance_arn:
                instance.name = name

    def provision_permission_set(
        self, instance_arn: str, permission_set_arn: str
    ) -> None:
        """
        The TargetType/TargetId parameters are currently ignored - PermissionSets are simply provisioned to the caller's account
        """
        permission_set = self._find_permission_set(instance_arn, permission_set_arn)
        instance = [i for i in self.instances if i.instance_arn == instance_arn][0]
        instance.provisioned_permission_sets.append(permission_set)

    def list_permission_sets_provisioned_to_account(
        self, instance_arn: str
    ) -> list[PermissionSet]:
        """
        The following parameters are not yet implemented: AccountId, ProvisioningStatus, MaxResults, NextToken
        """
        for instance in self.instances:
            if instance.instance_arn == instance_arn:
                return instance.provisioned_permission_sets
        return []

    def list_accounts_for_provisioned_permission_set(
        self, instance_arn: str, permission_set_arn: str
    ) -> list[str]:
        """
        The following parameters are not yet implemented: MaxResults, NextToken, ProvisioningStatus
        """
        for instance in self.instances:
            if instance.instance_arn == instance_arn:
                for ps in instance.provisioned_permission_sets:
                    if ps.permission_set_arn == permission_set_arn:
                        return [self.account_id]
        return []


ssoadmin_backends = BackendDict(SSOAdminBackend, "sso-admin")