1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
"""CloudDirectoryBackend class with methods for supported APIs."""
import datetime
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.utilities.paginator import paginate
from moto.utilities.tagging_service import TaggingService
from .exceptions import (
InvalidArnException,
ResourceNotFoundException,
SchemaAlreadyPublishedException,
)
PAGINATION_MODEL = {
"list_directories": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "directory_arn",
},
"list_development_schema_arns": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "schema_arn",
},
"list_published_schema_arns": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "schema_arn",
},
}
class Directory(BaseModel):
def __init__(
self, account_id: str, region: str, name: str, schema_arn: str
) -> None:
self.name = name
self.schema_arn = schema_arn
self.directory_arn = (
f"arn:aws:clouddirectory:{region}:{account_id}:directory/{name}"
)
self.state = "ENABLED"
self.creation_date_time = datetime.datetime.now()
self.object_identifier = f"directory-{name}"
def to_dict(self) -> dict[str, str]:
return {
"Name": self.name,
"SchemaArn": self.schema_arn,
"DirectoryArn": self.directory_arn,
"State": self.state,
"CreationDateTime": str(self.creation_date_time),
"ObjectIdentifier": self.object_identifier,
}
class CloudDirectoryBackend(BaseBackend):
"""Implementation of CloudDirectory APIs."""
def __init__(self, region_name: str, account_id: str) -> None:
super().__init__(region_name, account_id)
self.directories: dict[str, Directory] = {}
self.schemas_states: dict[str, list[str]] = {
"development": [],
"published": [],
"applied": [],
}
self.tagger = TaggingService()
def apply_schema(self, directory_arn: str, published_schema_arn: str) -> None:
directory = self.directories.get(directory_arn)
if not directory:
raise ResourceNotFoundException(directory_arn)
if published_schema_arn not in self.schemas_states["published"]:
raise ResourceNotFoundException(published_schema_arn)
directory.schema_arn = published_schema_arn
return
def publish_schema(
self, name: str, version: str, development_schema_arn: str, minor_version: str
) -> str:
schema_arn = f"arn:aws:clouddirectory:{self.region_name}:{self.account_id}:schema/published/{name}/{version}/{minor_version}"
if development_schema_arn in self.schemas_states["published"]:
raise SchemaAlreadyPublishedException(development_schema_arn)
if development_schema_arn in self.schemas_states["development"]:
self.schemas_states["development"].remove(development_schema_arn)
self.schemas_states["published"].append(schema_arn)
else:
raise ResourceNotFoundException(development_schema_arn)
return schema_arn
def create_directory(self, name: str, schema_arn: str) -> Directory:
directory = Directory(self.account_id, self.region_name, name, schema_arn)
self.directories[directory.directory_arn] = directory
return directory
def create_schema(self, name: str) -> str:
self.schema_arn = f"arn:aws:clouddirectory:{self.region_name}:{self.account_id}:schema/development/{name}"
self.schemas_states["development"].append(self.schema_arn)
return self.schema_arn
def delete_schema(self, schema_arn: str) -> None:
if schema_arn in self.schemas_states["development"]:
self.schemas_states["development"].remove(schema_arn)
elif schema_arn in self.schemas_states["published"]:
self.schemas_states["published"].remove(schema_arn)
else:
raise ResourceNotFoundException(schema_arn)
return
@paginate(pagination_model=PAGINATION_MODEL)
def list_development_schema_arns(self) -> list[str]:
return self.schemas_states["development"]
@paginate(pagination_model=PAGINATION_MODEL)
def list_published_schema_arns(self) -> list[str]:
return self.schemas_states["published"]
@paginate(pagination_model=PAGINATION_MODEL)
def list_directories(self, state: str) -> list[Directory]:
directories = list(self.directories.values())
if state:
directories = [
directory for directory in directories if directory.state == state
]
return directories
def tag_resource(self, resource_arn: str, tags: list[dict[str, str]]) -> None:
self.tagger.tag_resource(resource_arn, tags)
return
def untag_resource(self, resource_arn: str, tag_keys: list[str]) -> None:
if not isinstance(tag_keys, list):
tag_keys = [tag_keys]
self.tagger.untag_resource_using_names(resource_arn, tag_keys)
return
def delete_directory(self, directory_arn: str) -> str:
directory = self.directories.pop(directory_arn)
return directory.directory_arn
def get_directory(self, directory_arn: str) -> Directory:
directory = self.directories.get(directory_arn)
if not directory:
raise InvalidArnException(directory_arn)
return directory
def list_tags_for_resource(
self, resource_arn: str, next_token: str, max_results: int
) -> list[dict[str, str]]:
tags = self.tagger.list_tags_for_resource(resource_arn)["Tags"]
return tags
clouddirectory_backends = BackendDict(CloudDirectoryBackend, "clouddirectory")
|