File: models.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (160 lines) | stat: -rw-r--r-- 6,029 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""CloudDirectoryBackend class with methods for supported APIs."""

import datetime

from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.utilities.paginator import paginate
from moto.utilities.tagging_service import TaggingService

from .exceptions import (
    InvalidArnException,
    ResourceNotFoundException,
    SchemaAlreadyPublishedException,
)

PAGINATION_MODEL = {
    "list_directories": {
        "input_token": "next_token",
        "limit_key": "max_results",
        "limit_default": 100,
        "unique_attribute": "directory_arn",
    },
    "list_development_schema_arns": {
        "input_token": "next_token",
        "limit_key": "max_results",
        "limit_default": 100,
        "unique_attribute": "schema_arn",
    },
    "list_published_schema_arns": {
        "input_token": "next_token",
        "limit_key": "max_results",
        "limit_default": 100,
        "unique_attribute": "schema_arn",
    },
}


class Directory(BaseModel):
    def __init__(
        self, account_id: str, region: str, name: str, schema_arn: str
    ) -> None:
        self.name = name
        self.schema_arn = schema_arn
        self.directory_arn = (
            f"arn:aws:clouddirectory:{region}:{account_id}:directory/{name}"
        )
        self.state = "ENABLED"
        self.creation_date_time = datetime.datetime.now()
        self.object_identifier = f"directory-{name}"

    def to_dict(self) -> dict[str, str]:
        return {
            "Name": self.name,
            "SchemaArn": self.schema_arn,
            "DirectoryArn": self.directory_arn,
            "State": self.state,
            "CreationDateTime": str(self.creation_date_time),
            "ObjectIdentifier": self.object_identifier,
        }


class CloudDirectoryBackend(BaseBackend):
    """Implementation of CloudDirectory APIs."""

    def __init__(self, region_name: str, account_id: str) -> None:
        super().__init__(region_name, account_id)
        self.directories: dict[str, Directory] = {}
        self.schemas_states: dict[str, list[str]] = {
            "development": [],
            "published": [],
            "applied": [],
        }
        self.tagger = TaggingService()

    def apply_schema(self, directory_arn: str, published_schema_arn: str) -> None:
        directory = self.directories.get(directory_arn)
        if not directory:
            raise ResourceNotFoundException(directory_arn)
        if published_schema_arn not in self.schemas_states["published"]:
            raise ResourceNotFoundException(published_schema_arn)
        directory.schema_arn = published_schema_arn
        return

    def publish_schema(
        self, name: str, version: str, development_schema_arn: str, minor_version: str
    ) -> str:
        schema_arn = f"arn:aws:clouddirectory:{self.region_name}:{self.account_id}:schema/published/{name}/{version}/{minor_version}"
        if development_schema_arn in self.schemas_states["published"]:
            raise SchemaAlreadyPublishedException(development_schema_arn)
        if development_schema_arn in self.schemas_states["development"]:
            self.schemas_states["development"].remove(development_schema_arn)
            self.schemas_states["published"].append(schema_arn)
        else:
            raise ResourceNotFoundException(development_schema_arn)
        return schema_arn

    def create_directory(self, name: str, schema_arn: str) -> Directory:
        directory = Directory(self.account_id, self.region_name, name, schema_arn)
        self.directories[directory.directory_arn] = directory
        return directory

    def create_schema(self, name: str) -> str:
        self.schema_arn = f"arn:aws:clouddirectory:{self.region_name}:{self.account_id}:schema/development/{name}"
        self.schemas_states["development"].append(self.schema_arn)
        return self.schema_arn

    def delete_schema(self, schema_arn: str) -> None:
        if schema_arn in self.schemas_states["development"]:
            self.schemas_states["development"].remove(schema_arn)
        elif schema_arn in self.schemas_states["published"]:
            self.schemas_states["published"].remove(schema_arn)
        else:
            raise ResourceNotFoundException(schema_arn)
        return

    @paginate(pagination_model=PAGINATION_MODEL)
    def list_development_schema_arns(self) -> list[str]:
        return self.schemas_states["development"]

    @paginate(pagination_model=PAGINATION_MODEL)
    def list_published_schema_arns(self) -> list[str]:
        return self.schemas_states["published"]

    @paginate(pagination_model=PAGINATION_MODEL)
    def list_directories(self, state: str) -> list[Directory]:
        directories = list(self.directories.values())
        if state:
            directories = [
                directory for directory in directories if directory.state == state
            ]
        return directories

    def tag_resource(self, resource_arn: str, tags: list[dict[str, str]]) -> None:
        self.tagger.tag_resource(resource_arn, tags)
        return

    def untag_resource(self, resource_arn: str, tag_keys: list[str]) -> None:
        if not isinstance(tag_keys, list):
            tag_keys = [tag_keys]
        self.tagger.untag_resource_using_names(resource_arn, tag_keys)
        return

    def delete_directory(self, directory_arn: str) -> str:
        directory = self.directories.pop(directory_arn)
        return directory.directory_arn

    def get_directory(self, directory_arn: str) -> Directory:
        directory = self.directories.get(directory_arn)
        if not directory:
            raise InvalidArnException(directory_arn)
        return directory

    def list_tags_for_resource(
        self, resource_arn: str, next_token: str, max_results: int
    ) -> list[dict[str, str]]:
        tags = self.tagger.list_tags_for_resource(resource_arn)["Tags"]
        return tags


clouddirectory_backends = BackendDict(CloudDirectoryBackend, "clouddirectory")