1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
|
"""SyntheticsBackend class with methods for supported APIs."""
import datetime
import uuid
from typing import Optional
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import iso_8601_datetime_with_milliseconds
class Canary(BaseModel): # pylint: disable=too-many-instance-attributes,too-few-public-methods
"""
Represents a CloudWatch Synthetics Canary resource.
"""
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals
self,
name: str,
code: dict[str, object],
artifact_s3_location: str,
execution_role_arn: str,
schedule: dict[str, object],
run_config: dict[str, object],
success_retention_period_in_days: int,
failure_retention_period_in_days: int,
runtime_version: str,
vpc_config: Optional[dict[str, object]],
resources_to_replicate_tags: Optional[list[str]],
provisioned_resource_cleanup: Optional[str],
browser_configs: Optional[list[dict[str, object]]],
tags: Optional[dict[str, str]],
artifact_config: Optional[dict[str, object]],
):
self.name = name
self.id = str(uuid.uuid4())
self.code = code
self.artifact_s3_location = artifact_s3_location
self.execution_role_arn = execution_role_arn
self.schedule = schedule
self.run_config = run_config
self.success_retention_period_in_days = success_retention_period_in_days
self.failure_retention_period_in_days = failure_retention_period_in_days
self.runtime_version = runtime_version
self.vpc_config = vpc_config
self.resources_to_replicate_tags = resources_to_replicate_tags
self.provisioned_resource_cleanup = provisioned_resource_cleanup
self.browser_configs = browser_configs or []
self.tags = tags or {}
self.artifact_config = artifact_config
self.state = "READY"
now = datetime.datetime.utcnow()
self.timeline = {
"Created": now,
"LastModified": now,
"LastStarted": None,
"LastStopped": None,
}
def to_dict(self) -> dict[str, object]:
"""
Convert the Canary object to a dictionary representation.
"""
return {
"Id": self.id,
"Name": self.name,
"Code": self.code,
"ArtifactS3Location": self.artifact_s3_location,
"ExecutionRoleArn": self.execution_role_arn,
"Schedule": self.schedule,
"RunConfig": self.run_config,
"SuccessRetentionPeriodInDays": self.success_retention_period_in_days,
"FailureRetentionPeriodInDays": self.failure_retention_period_in_days,
"RuntimeVersion": self.runtime_version,
"VpcConfig": self.vpc_config,
"ProvisionedResourceCleanup": self.provisioned_resource_cleanup,
"BrowserConfigs": self.browser_configs,
"Tags": self.tags,
"ArtifactConfig": self.artifact_config,
"Status": {
"State": self.state,
"StateReason": "Created by Moto",
"StateReasonCode": "CREATE_COMPLETE",
},
"Timeline": {
"Created": iso_8601_datetime_with_milliseconds(
self.timeline["Created"]
),
"LastModified": iso_8601_datetime_with_milliseconds(
self.timeline["LastModified"]
),
"LastStarted": None,
"LastStopped": None,
},
}
class SyntheticsBackend(BaseBackend):
"""Implementation of Synthetics APIs."""
def __init__(self, region_name: str, account_id: str):
"""
Initialize the SyntheticsBackend with region and account.
"""
super().__init__(region_name, account_id)
self.canaries: dict[str, Canary] = {}
def create_canary( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals
self,
name: str,
code: dict[str, object],
artifact_s3_location: str,
execution_role_arn: str,
schedule: dict[str, object],
run_config: dict[str, object],
success_retention_period_in_days: int,
failure_retention_period_in_days: int,
runtime_version: str,
vpc_config: Optional[dict[str, object]],
resources_to_replicate_tags: Optional[list[str]],
provisioned_resource_cleanup: Optional[str],
browser_configs: Optional[list[dict[str, object]]],
tags: Optional[dict[str, str]],
artifact_config: Optional[dict[str, object]],
) -> Canary:
canary = Canary(
name,
code,
artifact_s3_location,
execution_role_arn,
schedule,
run_config,
success_retention_period_in_days,
failure_retention_period_in_days,
runtime_version,
vpc_config,
resources_to_replicate_tags,
provisioned_resource_cleanup,
browser_configs,
tags,
artifact_config,
)
self.canaries[name] = canary
return canary
def get_canary(self, name: str, dry_run_id: Optional[str] = None) -> Canary: # pylint: disable=unused-argument
"""
The dry-run_id-parameter is not yet supported
"""
# dry_run_id is unused, included for API compatibility
return self.canaries[name]
def describe_canaries(
self,
next_token: Optional[str], # pylint: disable=unused-argument
max_results: Optional[int], # pylint: disable=unused-argument
names: Optional[list[str]],
) -> tuple[list[Canary], None]:
"""
Pagination is not yet supported
"""
canaries = list(self.canaries.values())
if names:
canaries = [c for c in canaries if c.name in names]
return canaries, None
def list_tags_for_resource(self, resource_arn: str) -> dict[str, str]:
# Simplified: assume resource_arn is actually the canary name
canary = self.canaries.get(resource_arn)
return canary.tags if canary else {}
# Exported backend dict for Moto Synthetics
synthetics_backends = BackendDict(SyntheticsBackend, "synthetics")
|