1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
|
from collections import OrderedDict
from typing import Any, Optional
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.mediaconnect.exceptions import NotFoundException
from moto.moto_api._internal import mock_random as random
from moto.utilities.tagging_service import TaggingService
from moto.utilities.utils import get_partition
class Flow(BaseModel):
def __init__(self, account_id: str, region_name: str, **kwargs: Any):
self.id = random.uuid4().hex
self.availability_zone = kwargs.get("availability_zone")
self.entitlements = kwargs.get("entitlements", [])
self.name = kwargs.get("name")
self.outputs = kwargs.get("outputs", [])
self.source = kwargs.get("source", {})
self.source_failover_config = kwargs.get("source_failover_config", {})
self.sources = kwargs.get("sources", [])
self.vpc_interfaces = kwargs.get("vpc_interfaces", [])
self.status: Optional[str] = (
"STANDBY" # one of 'STANDBY'|'ACTIVE'|'UPDATING'|'DELETING'|'STARTING'|'STOPPING'|'ERROR'
)
self._previous_status: Optional[str] = None
self.description = "A Moto test flow"
self.flow_arn = f"arn:{get_partition(region_name)}:mediaconnect:{region_name}:{account_id}:flow:{self.id}:{self.name}"
self.egress_ip = "127.0.0.1"
self.maintenance = kwargs.get("maintenance", {})
if self.source and not self.sources:
self.sources = [
self.source,
]
def to_dict(self, include: Optional[list[str]] = None) -> dict[str, Any]:
data = {
"availabilityZone": self.availability_zone,
"description": self.description,
"egressIp": self.egress_ip,
"entitlements": self.entitlements,
"flowArn": self.flow_arn,
"name": self.name,
"outputs": self.outputs,
"source": self.source,
"sourceFailoverConfig": self.source_failover_config,
"sources": self.sources,
"status": self.status,
"vpcInterfaces": self.vpc_interfaces,
}
if self.maintenance:
data["maintenance"] = self.maintenance
if include:
new_data = {k: v for k, v in data.items() if k in include}
if "sourceType" in include:
new_data["sourceType"] = "OWNED"
return new_data
return data
def resolve_transient_states(self) -> None:
if self.status in ["STARTING"]:
self.status = "ACTIVE"
if self.status in ["STOPPING"]:
self.status = "STANDBY"
if self.status in ["UPDATING"]:
self.status = self._previous_status
self._previous_status = None
class MediaConnectBackend(BaseBackend):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self._flows: dict[str, Flow] = OrderedDict()
self.tagger = TaggingService()
def _add_source_details(
self,
source: Optional[dict[str, Any]],
flow_id: str,
ingest_ip: str = "127.0.0.1",
) -> None:
if source:
source["sourceArn"] = (
f"arn:{get_partition(self.region_name)}:mediaconnect:{self.region_name}:{self.account_id}:source"
f":{flow_id}:{source['name']}"
)
if not source.get("entitlementArn"):
source["ingestIp"] = ingest_ip
def _add_entitlement_details(
self, entitlement: Optional[dict[str, Any]], entitlement_id: str
) -> None:
if entitlement:
entitlement["entitlementArn"] = (
f"arn:{get_partition(self.region_name)}:mediaconnect:{self.region_name}"
f":{self.account_id}:entitlement:{entitlement_id}"
f":{entitlement['name']}"
)
def _create_flow_add_details(self, flow: Flow) -> None:
for index, _source in enumerate(flow.sources):
self._add_source_details(_source, flow.id, f"127.0.0.{index}")
for index, output in enumerate(flow.outputs or []):
if output.get("protocol") in ["srt-listener", "zixi-pull"]:
output["listenerAddress"] = f"{index}.0.0.0"
output_id = random.uuid4().hex
arn = (
f"arn:{get_partition(self.region_name)}:mediaconnect:{self.region_name}"
f":{self.account_id}:output:{output_id}:{output['name']}"
)
output["outputArn"] = arn
for _, entitlement in enumerate(flow.entitlements):
entitlement_id = random.uuid4().hex
self._add_entitlement_details(entitlement, entitlement_id)
def create_flow(
self,
availability_zone: str,
entitlements: list[dict[str, Any]],
name: str,
outputs: list[dict[str, Any]],
source: dict[str, Any],
source_failover_config: dict[str, Any],
sources: list[dict[str, Any]],
vpc_interfaces: list[dict[str, Any]],
maintenance: Optional[list[dict[str, Any]]] = None,
) -> Flow:
flow = Flow(
account_id=self.account_id,
region_name=self.region_name,
availability_zone=availability_zone,
entitlements=entitlements,
name=name,
outputs=outputs,
source=source,
source_failover_config=source_failover_config,
sources=sources,
vpc_interfaces=vpc_interfaces,
maintenance=maintenance,
)
self._create_flow_add_details(flow)
self._flows[flow.flow_arn] = flow
return flow
def list_flows(self, max_results: Optional[int]) -> list[dict[str, Any]]:
"""
Pagination is not yet implemented
"""
flows = list(self._flows.values())
if max_results is not None:
flows = flows[:max_results]
return [
fl.to_dict(
include=[
"availabilityZone",
"description",
"flowArn",
"name",
"sourceType",
"status",
]
)
for fl in flows
]
def describe_flow(self, flow_arn: str) -> Flow:
if flow_arn in self._flows:
flow = self._flows[flow_arn]
flow.resolve_transient_states()
return flow
raise NotFoundException(message="Flow not found.")
def delete_flow(self, flow_arn: str) -> Flow:
if flow_arn in self._flows:
return self._flows.pop(flow_arn)
raise NotFoundException(message="Flow not found.")
def start_flow(self, flow_arn: str) -> Flow:
if flow_arn in self._flows:
flow = self._flows[flow_arn]
flow.status = "STARTING"
return flow
raise NotFoundException(message="Flow not found.")
def stop_flow(self, flow_arn: str) -> Flow:
if flow_arn in self._flows:
flow = self._flows[flow_arn]
flow.status = "STOPPING"
return flow
raise NotFoundException(message="Flow not found.")
def tag_resource(self, resource_arn: str, tags: dict[str, Any]) -> None:
tag_list = TaggingService.convert_dict_to_tags_input(tags)
self.tagger.tag_resource(resource_arn, tag_list)
def list_tags_for_resource(self, resource_arn: str) -> dict[str, str]:
if self.tagger.has_tags(resource_arn):
return self.tagger.get_tag_dict_for_resource(resource_arn)
raise NotFoundException(message="Resource not found.")
def add_flow_vpc_interfaces(
self, flow_arn: str, vpc_interfaces: list[dict[str, Any]]
) -> Flow:
if flow_arn in self._flows:
flow = self._flows[flow_arn]
flow.vpc_interfaces = vpc_interfaces
return flow
raise NotFoundException(message=f"flow with arn={flow_arn} not found")
def add_flow_outputs(self, flow_arn: str, outputs: list[dict[str, Any]]) -> Flow:
if flow_arn in self._flows:
flow = self._flows[flow_arn]
flow.outputs = outputs
return flow
raise NotFoundException(message=f"flow with arn={flow_arn} not found")
def remove_flow_vpc_interface(self, flow_arn: str, vpc_interface_name: str) -> None:
if flow_arn in self._flows:
flow = self._flows[flow_arn]
flow.vpc_interfaces = [
vpc_interface
for vpc_interface in self._flows[flow_arn].vpc_interfaces
if vpc_interface["name"] != vpc_interface_name
]
else:
raise NotFoundException(message=f"flow with arn={flow_arn} not found")
def remove_flow_output(self, flow_arn: str, output_name: str) -> None:
if flow_arn in self._flows:
flow = self._flows[flow_arn]
flow.outputs = [
output
for output in self._flows[flow_arn].outputs
if output["name"] != output_name
]
else:
raise NotFoundException(message=f"flow with arn={flow_arn} not found")
def update_flow_output(
self,
flow_arn: str,
output_arn: str,
cidr_allow_list: list[str],
description: str,
destination: str,
encryption: dict[str, str],
max_latency: int,
media_stream_output_configuration: list[dict[str, Any]],
min_latency: int,
port: int,
protocol: str,
remote_id: str,
sender_control_port: int,
sender_ip_address: str,
smoothing_latency: int,
stream_id: str,
vpc_interface_attachment: dict[str, str],
) -> dict[str, Any]:
if flow_arn not in self._flows:
raise NotFoundException(message=f"flow with arn={flow_arn} not found")
flow = self._flows[flow_arn]
for output in flow.outputs:
if output["outputArn"] == output_arn:
output["cidrAllowList"] = cidr_allow_list
output["description"] = description
output["destination"] = destination
output["encryption"] = encryption
output["maxLatency"] = max_latency
output["mediaStreamOutputConfiguration"] = (
media_stream_output_configuration
)
output["minLatency"] = min_latency
output["port"] = port
output["protocol"] = protocol
output["remoteId"] = remote_id
output["senderControlPort"] = sender_control_port
output["senderIpAddress"] = sender_ip_address
output["smoothingLatency"] = smoothing_latency
output["streamId"] = stream_id
output["vpcInterfaceAttachment"] = vpc_interface_attachment
return output
raise NotFoundException(message=f"output with arn={output_arn} not found")
def add_flow_sources(
self, flow_arn: str, sources: list[dict[str, Any]]
) -> list[dict[str, Any]]:
if flow_arn not in self._flows:
raise NotFoundException(message=f"flow with arn={flow_arn} not found")
flow = self._flows[flow_arn]
for source in sources:
source_id = random.uuid4().hex
name = source["name"]
arn = f"arn:{get_partition(self.region_name)}:mediaconnect:{self.region_name}:{self.account_id}:source:{source_id}:{name}"
source["sourceArn"] = arn
flow.sources = sources
return sources
def update_flow_source(
self,
flow_arn: str,
source_arn: str,
decryption: str,
description: str,
entitlement_arn: str,
ingest_port: int,
max_bitrate: int,
max_latency: int,
max_sync_buffer: int,
media_stream_source_configurations: list[dict[str, Any]],
min_latency: int,
protocol: str,
sender_control_port: int,
sender_ip_address: str,
stream_id: str,
vpc_interface_name: str,
whitelist_cidr: str,
) -> Optional[dict[str, Any]]:
if flow_arn not in self._flows:
raise NotFoundException(message=f"flow with arn={flow_arn} not found")
flow = self._flows[flow_arn]
source: Optional[dict[str, Any]] = next(
iter(
[source for source in flow.sources if source["sourceArn"] == source_arn]
),
{},
)
if source:
source["decryption"] = decryption
source["description"] = description
source["entitlementArn"] = entitlement_arn
source["ingestPort"] = ingest_port
source["maxBitrate"] = max_bitrate
source["maxLatency"] = max_latency
source["maxSyncBuffer"] = max_sync_buffer
source["mediaStreamSourceConfigurations"] = (
media_stream_source_configurations
)
source["minLatency"] = min_latency
source["protocol"] = protocol
source["senderControlPort"] = sender_control_port
source["senderIpAddress"] = sender_ip_address
source["streamId"] = stream_id
source["vpcInterfaceName"] = vpc_interface_name
source["whitelistCidr"] = whitelist_cidr
return source
def grant_flow_entitlements(
self,
flow_arn: str,
entitlements: list[dict[str, Any]],
) -> list[dict[str, Any]]:
if flow_arn not in self._flows:
raise NotFoundException(message=f"flow with arn={flow_arn} not found")
flow = self._flows[flow_arn]
for entitlement in entitlements:
entitlement_id = random.uuid4().hex
name = entitlement["name"]
arn = f"arn:{get_partition(self.region_name)}:mediaconnect:{self.region_name}:{self.account_id}:entitlement:{entitlement_id}:{name}"
entitlement["entitlementArn"] = arn
flow.entitlements += entitlements
return entitlements
def revoke_flow_entitlement(self, flow_arn: str, entitlement_arn: str) -> None:
if flow_arn not in self._flows:
raise NotFoundException(message=f"flow with arn={flow_arn} not found")
flow = self._flows[flow_arn]
for entitlement in flow.entitlements:
if entitlement_arn == entitlement["entitlementArn"]:
flow.entitlements.remove(entitlement)
return
raise NotFoundException(
message=f"entitlement with arn={entitlement_arn} not found"
)
def update_flow_entitlement(
self,
flow_arn: str,
entitlement_arn: str,
description: str,
encryption: dict[str, str],
entitlement_status: str,
name: str,
subscribers: list[str],
) -> dict[str, Any]:
if flow_arn not in self._flows:
raise NotFoundException(message=f"flow with arn={flow_arn} not found")
flow = self._flows[flow_arn]
for entitlement in flow.entitlements:
if entitlement_arn == entitlement["entitlementArn"]:
entitlement["description"] = description
entitlement["encryption"] = encryption
entitlement["entitlementStatus"] = entitlement_status
entitlement["name"] = name
entitlement["subscribers"] = subscribers
return entitlement
raise NotFoundException(
message=f"entitlement with arn={entitlement_arn} not found"
)
mediaconnect_backends = BackendDict(MediaConnectBackend, "mediaconnect")
|