File: iam_instance_profile.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (160 lines) | stat: -rw-r--r-- 6,280 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from typing import Any, Optional

from moto.core.common_models import CloudFormationModel
from moto.ec2.models.instances import Instance
from moto.iam.models import InstanceProfile

from ..exceptions import (
    IncorrectStateIamProfileAssociationError,
    InvalidAssociationIDIamProfileAssociationError,
)
from ..utils import (
    filter_iam_instance_profile_associations,
    filter_iam_instance_profiles,
    random_iam_instance_profile_association_id,
)


class IamInstanceProfileAssociation(CloudFormationModel):
    def __init__(
        self,
        ec2_backend: Any,
        association_id: str,
        instance: Instance,
        iam_instance_profile: InstanceProfile,
    ):
        self.ec2_backend = ec2_backend
        self.association_id = association_id
        self.instance = instance
        self.instance_id = instance.id
        self.iam_instance_profile = iam_instance_profile
        self.state = "associated"
        ec2_backend.modify_instance_attribute(
            instance.id,
            "iam_instance_profile",
            {"Arn": self.iam_instance_profile.arn, "Id": association_id},
        )


class IamInstanceProfileAssociationBackend:
    def __init__(self) -> None:
        self.iam_instance_profile_associations: dict[
            str, IamInstanceProfileAssociation
        ] = {}

    def associate_iam_instance_profile(
        self,
        instance_id: str,
        iam_instance_profile_name: Optional[str] = None,
        iam_instance_profile_arn: Optional[str] = None,
    ) -> IamInstanceProfileAssociation:
        iam_association_id = random_iam_instance_profile_association_id()

        instance_profile = filter_iam_instance_profiles(
            self.account_id,  # type: ignore[attr-defined]
            partition=self.partition,  # type: ignore[attr-defined]
            iam_instance_profile_arn=iam_instance_profile_arn,
            iam_instance_profile_name=iam_instance_profile_name,
        )

        if instance_id in self.iam_instance_profile_associations.keys():
            raise IncorrectStateIamProfileAssociationError(instance_id)

        iam_instance_profile_association = IamInstanceProfileAssociation(
            self,
            iam_association_id,
            self.get_instance(instance_id) if instance_id else None,  # type: ignore[attr-defined, arg-type]
            instance_profile,
        )
        # Regarding to AWS there can be only one association with ec2.
        self.iam_instance_profile_associations[instance_id] = (
            iam_instance_profile_association
        )
        return iam_instance_profile_association

    def describe_iam_instance_profile_associations(
        self,
        association_ids: list[str],
        filters: Any = None,
        max_results: int = 100,
        next_token: Optional[str] = None,
    ) -> tuple[list[IamInstanceProfileAssociation], Optional[str]]:
        associations_list: list[IamInstanceProfileAssociation] = []
        if association_ids:
            for association in self.iam_instance_profile_associations.values():
                if association.association_id in association_ids:
                    associations_list.append(association)
        else:
            # That's mean that no association id were given. Showing all.
            associations_list.extend(self.iam_instance_profile_associations.values())

        associations_list = filter_iam_instance_profile_associations(
            associations_list, filters
        )

        starting_point = int(next_token or 0)
        ending_point = starting_point + int(max_results or 100)
        associations_page = associations_list[starting_point:ending_point]
        new_next_token = (
            str(ending_point) if ending_point < len(associations_list) else None
        )

        return associations_page, new_next_token

    def disassociate_iam_instance_profile(
        self, association_id: str
    ) -> IamInstanceProfileAssociation:
        iam_instance_profile_association = None
        for association_key in self.iam_instance_profile_associations.keys():
            if (
                self.iam_instance_profile_associations[association_key].association_id
                == association_id
            ):
                iam_instance_profile_association = (
                    self.iam_instance_profile_associations[association_key]
                )
                del self.iam_instance_profile_associations[association_key]
                # Deleting once and avoiding `RuntimeError: dictionary changed size during iteration`

                iam_instance_profile_association.ec2_backend.modify_instance_attribute(
                    association_key, "iam_instance_profile", None
                )

                break

        if not iam_instance_profile_association:
            raise InvalidAssociationIDIamProfileAssociationError(association_id)

        return iam_instance_profile_association

    def replace_iam_instance_profile_association(
        self,
        association_id: str,
        iam_instance_profile_name: Optional[str] = None,
        iam_instance_profile_arn: Optional[str] = None,
    ) -> IamInstanceProfileAssociation:
        instance_profile = filter_iam_instance_profiles(
            self.account_id,  # type: ignore[attr-defined]
            partition=self.partition,  # type: ignore[attr-defined]
            iam_instance_profile_arn=iam_instance_profile_arn,
            iam_instance_profile_name=iam_instance_profile_name,
        )

        iam_instance_profile_association = None
        for association_key in self.iam_instance_profile_associations.keys():
            if (
                self.iam_instance_profile_associations[association_key].association_id
                == association_id
            ):
                self.iam_instance_profile_associations[
                    association_key
                ].iam_instance_profile = instance_profile
                iam_instance_profile_association = (
                    self.iam_instance_profile_associations[association_key]
                )
                break

        if not iam_instance_profile_association:
            raise InvalidAssociationIDIamProfileAssociationError(association_id)

        return iam_instance_profile_association