1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
|
from __future__ import annotations
import abc
from itertools import takewhile
from typing import Final, Optional
from moto.stepfunctions.parser.asl.component.eval_component import EvalComponent
from moto.stepfunctions.parser.asl.eval.environment import Environment
class ResourceCondition(str):
WaitForTaskToken = "waitForTaskToken"
Sync2 = "sync:2"
Sync = "sync"
class ResourceARN:
arn: str
partition: str
service: str
region: str
account: str
task_type: str
name: str
option: str
def __init__(
self,
arn: str,
partition: str,
service: str,
region: str,
account: str,
task_type: str,
name: str,
option: Optional[str],
):
self.arn = arn
self.partition = partition
self.service = service
self.region = region
self.account = account
self.task_type = task_type
self.name = name
self.option = option
@staticmethod
def _consume_until(text: str, symbol: str) -> tuple[str, str]:
value = "".join(takewhile(lambda c: c != symbol, text))
tail_idx = len(value) + 1
return value, text[tail_idx:]
@classmethod
def from_arn(cls, arn: str) -> ResourceARN:
_, arn_tail = ResourceARN._consume_until(arn, ":")
partition, arn_tail = ResourceARN._consume_until(arn_tail, ":")
service, arn_tail = ResourceARN._consume_until(arn_tail, ":")
region, arn_tail = ResourceARN._consume_until(arn_tail, ":")
account, arn_tail = ResourceARN._consume_until(arn_tail, ":")
task_type, arn_tail = ResourceARN._consume_until(arn_tail, ":")
name, arn_tail = ResourceARN._consume_until(arn_tail, ".")
option = arn_tail
return cls(
arn=arn,
partition=partition,
service=service,
region=region,
account=account,
task_type=task_type,
name=name,
option=option,
)
class ResourceRuntimePart:
account: Final[str]
region: Final[str]
def __init__(self, account: str, region: str):
self.region = region
self.account = account
class Resource(EvalComponent, abc.ABC):
_region: Final[str]
_account: Final[str]
resource_arn: Final[str]
partition: Final[str]
def __init__(self, resource_arn: ResourceARN):
self._region = resource_arn.region
self._account = resource_arn.account
self.resource_arn = resource_arn.arn
self.partition = resource_arn.partition
@staticmethod
def from_resource_arn(arn: str) -> Resource:
resource_arn = ResourceARN.from_arn(arn)
if (resource_arn.service, resource_arn.task_type) == ("lambda", "function"):
return LambdaResource(resource_arn=resource_arn)
if (resource_arn.service, resource_arn.task_type) == ("states", "activity"):
return ActivityResource(resource_arn=resource_arn)
if resource_arn.service == "states":
return ServiceResource(resource_arn=resource_arn)
def _eval_runtime_part(self, env: Environment) -> ResourceRuntimePart:
region = self._region if self._region else env.aws_execution_details.region
account = self._account if self._account else env.aws_execution_details.account
return ResourceRuntimePart(
account=account,
region=region,
)
def _eval_body(self, env: Environment) -> None:
runtime_part = self._eval_runtime_part(env=env)
env.stack.append(runtime_part)
class ActivityResource(Resource):
name: Final[str]
def __init__(self, resource_arn: ResourceARN):
super().__init__(resource_arn=resource_arn)
self.name = resource_arn.name
class LambdaResource(Resource):
function_name: Final[str]
def __init__(self, resource_arn: ResourceARN):
super().__init__(resource_arn=resource_arn)
self.function_name = resource_arn.name
class ServiceResource(Resource):
service_name: Final[str]
api_name: Final[str]
api_action: Final[str]
condition: Final[Optional[str]]
def __init__(self, resource_arn: ResourceARN):
super().__init__(resource_arn=resource_arn)
self.service_name = resource_arn.task_type
name_parts = resource_arn.name.split(":")
if len(name_parts) == 1:
self.api_name = self.service_name
self.api_action = resource_arn.name
elif len(name_parts) > 1:
self.api_name = name_parts[0]
self.api_action = name_parts[1]
else:
raise RuntimeError(
f"Incorrect definition of ResourceArn.name: '{resource_arn.name}'."
)
self.condition = None
option = resource_arn.option
if option:
if option == ResourceCondition.WaitForTaskToken:
self.condition = ResourceCondition.WaitForTaskToken
elif option == "sync":
self.condition = ResourceCondition.Sync
elif option == "sync:2":
self.condition = ResourceCondition.Sync2
else:
raise RuntimeError(f"Unsupported condition '{option}'.")
|