1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
|
"""Handles incoming clouddirectory requests, invokes methods, returns responses."""
import json
from moto.core.responses import BaseResponse
from .models import CloudDirectoryBackend, clouddirectory_backends
class CloudDirectoryResponse(BaseResponse):
"""Handler for CloudDirectory requests and responses."""
def __init__(self) -> None:
super().__init__(service_name="clouddirectory")
@property
def clouddirectory_backend(self) -> "CloudDirectoryBackend":
"""Return backend instance specific for this region."""
return clouddirectory_backends[self.current_account][self.region]
def apply_schema(self) -> str:
directory_arn = self.headers.get("x-amz-data-partition")
published_schema_arn = self._get_param("PublishedSchemaArn")
self.clouddirectory_backend.apply_schema(
directory_arn=directory_arn,
published_schema_arn=published_schema_arn,
)
return json.dumps(
{
"AppliedSchemaArn": published_schema_arn,
"DirectoryArn": directory_arn,
}
)
def publish_schema(self) -> str:
development_schema_arn = self.headers.get("x-amz-data-partition")
version = self._get_param("Version")
minor_version = self._get_param("MinorVersion")
name = self._get_param("Name")
schema = self.clouddirectory_backend.publish_schema(
name=name,
version=version,
minor_version=minor_version,
development_schema_arn=development_schema_arn,
)
return json.dumps({"PublishedSchemaArn": schema})
def create_directory(self) -> str:
name = self._get_param("Name")
schema_arn = self.headers.get("x-amz-data-partition")
directory = self.clouddirectory_backend.create_directory(
name=name,
schema_arn=schema_arn,
)
return json.dumps(
{
"DirectoryArn": directory.directory_arn,
"Name": name,
"ObjectIdentifier": directory.object_identifier,
"AppliedSchemaArn": directory.schema_arn,
}
)
def create_schema(self) -> str:
name = self._get_param("Name")
schema = self.clouddirectory_backend.create_schema(
name=name,
)
return json.dumps({"SchemaArn": schema})
def list_directories(self) -> str:
next_token = self._get_param("NextToken")
max_results = self._get_param("MaxResults")
state = self._get_param("state")
directories, next_token = self.clouddirectory_backend.list_directories(
state=state, next_token=next_token, max_results=max_results
)
directory_list = [directory.to_dict() for directory in directories]
return json.dumps({"Directories": directory_list, "NextToken": next_token})
def tag_resource(self) -> str:
resource_arn = self._get_param("ResourceArn")
tags = self._get_param("Tags")
self.clouddirectory_backend.tag_resource(
resource_arn=resource_arn,
tags=tags,
)
return json.dumps({})
def untag_resource(self) -> str:
resource_arn = self._get_param("ResourceArn")
tag_keys = self._get_param("TagKeys")
self.clouddirectory_backend.untag_resource(
resource_arn=resource_arn,
tag_keys=tag_keys,
)
return json.dumps({})
def delete_directory(self) -> str:
# Retrieve arn from headers
# https://docs.aws.amazon.com/clouddirectory/latest/APIReference/API_DeleteDirectory.html
arn = self.headers.get("x-amz-data-partition")
directory_arn = self.clouddirectory_backend.delete_directory(
directory_arn=arn,
)
return json.dumps({"DirectoryArn": directory_arn})
def delete_schema(self) -> str:
# Retrieve arn from headers
# https://docs.aws.amazon.com/clouddirectory/latest/APIReference/API_DeleteSchema.html
arn = self.headers.get("x-amz-data-partition")
self.clouddirectory_backend.delete_schema(
schema_arn=arn,
)
return json.dumps({"SchemaArn": arn})
def list_development_schema_arns(self) -> str:
next_token = self._get_param("NextToken")
max_results = self._get_param("MaxResults")
schemas, next_token = self.clouddirectory_backend.list_development_schema_arns(
next_token=next_token,
max_results=max_results,
)
return json.dumps({"SchemaArns": schemas, "NextToken": next_token})
def list_published_schema_arns(self) -> str:
next_token = self._get_param("NextToken")
max_results = self._get_param("MaxResults")
schemas, next_token = self.clouddirectory_backend.list_published_schema_arns(
next_token=next_token,
max_results=max_results,
)
return json.dumps({"SchemaArns": schemas, "NextToken": next_token})
def get_directory(self) -> str:
# Retrieve arn from headers
# https://docs.aws.amazon.com/clouddirectory/latest/APIReference/API_GetDirectory.html
arn = self.headers.get("x-amz-data-partition")
directory = self.clouddirectory_backend.get_directory(
directory_arn=arn,
)
return json.dumps({"Directory": directory.to_dict()})
def list_tags_for_resource(self) -> str:
resource_arn = self._get_param("ResourceArn")
next_token = self._get_param("NextToken")
max_results = self._get_param("MaxResults")
tags = self.clouddirectory_backend.list_tags_for_resource(
resource_arn=resource_arn,
next_token=next_token,
max_results=max_results,
)
return json.dumps({"Tags": tags, "NextToken": next_token})
|