1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
|
from __future__ import annotations
import abc
import datetime
import logging
from abc import ABC
from typing import Any, Final, Optional
from moto.stepfunctions.parser.api import (
ExecutionFailedEventDetails,
HistoryEventExecutionDataDetails,
HistoryEventType,
StateEnteredEventDetails,
StateExitedEventDetails,
TaskFailedEventDetails,
)
from moto.stepfunctions.parser.asl.component.common.assign.assign_decl import AssignDecl
from moto.stepfunctions.parser.asl.component.common.comment import Comment
from moto.stepfunctions.parser.asl.component.common.error_name.failure_event import (
FailureEvent,
FailureEventException,
)
from moto.stepfunctions.parser.asl.component.common.error_name.states_error_name import (
StatesErrorName,
)
from moto.stepfunctions.parser.asl.component.common.error_name.states_error_name_type import (
StatesErrorNameType,
)
from moto.stepfunctions.parser.asl.component.common.flow.end import End
from moto.stepfunctions.parser.asl.component.common.flow.next import Next
from moto.stepfunctions.parser.asl.component.common.outputdecl import Output
from moto.stepfunctions.parser.asl.component.common.path.input_path import (
InputPath,
)
from moto.stepfunctions.parser.asl.component.common.path.output_path import OutputPath
from moto.stepfunctions.parser.asl.component.common.query_language import (
QueryLanguage,
QueryLanguageMode,
)
from moto.stepfunctions.parser.asl.component.common.string.string_expression import (
JSONPATH_ROOT_PATH,
StringJsonPath,
)
from moto.stepfunctions.parser.asl.component.eval_component import EvalComponent
from moto.stepfunctions.parser.asl.component.state.state_continue_with import (
ContinueWith,
ContinueWithEnd,
ContinueWithNext,
)
from moto.stepfunctions.parser.asl.component.state.state_props import StateProps
from moto.stepfunctions.parser.asl.component.state.state_type import StateType
from moto.stepfunctions.parser.asl.eval.environment import Environment
from moto.stepfunctions.parser.asl.eval.event.event_detail import EventDetails
from moto.stepfunctions.parser.asl.eval.program_state import ProgramRunning
from moto.stepfunctions.parser.asl.eval.states import StateData
from moto.stepfunctions.parser.asl.utils.encoding import to_json_str
from moto.stepfunctions.parser.asl.utils.json_path import NoSuchJsonPathError
from moto.stepfunctions.parser.quotas import is_within_size_quota
LOG = logging.getLogger(__name__)
class CommonStateField(EvalComponent, ABC):
name: str
query_language: QueryLanguage
# The state's type.
state_type: StateType
# There can be any number of terminal states per state machine. Only one of Next or End can
# be used in a state. Some state types, such as Choice, don't support or use the End field.
continue_with: ContinueWith
# Holds a human-readable description of the state.
comment: Optional[Comment]
# A path that selects a portion of the state's input to be passed to the state's state_task for processing.
# If omitted, it has the value $ which designates the entire input.
input_path: Optional[InputPath]
# A path that selects a portion of the state's output to be passed to the next state.
# If omitted, it has the value $ which designates the entire output.
output_path: Optional[OutputPath]
assign_decl: Optional[AssignDecl]
output: Optional[Output]
state_entered_event_type: Final[HistoryEventType]
state_exited_event_type: Final[Optional[HistoryEventType]]
def __init__(
self,
state_entered_event_type: HistoryEventType,
state_exited_event_type: Optional[HistoryEventType],
):
self.state_entered_event_type = state_entered_event_type
self.state_exited_event_type = state_exited_event_type
def from_state_props(self, state_props: StateProps) -> None:
self.name = state_props.name
self.query_language = state_props.get(QueryLanguage) or QueryLanguage()
self.state_type = state_props.get(StateType)
self.continue_with = (
ContinueWithEnd()
if state_props.get(End)
else ContinueWithNext(state_props.get(Next))
)
self.comment = state_props.get(Comment)
self.assign_decl = state_props.get(AssignDecl)
# JSONPath sub-productions.
if self.query_language.query_language_mode == QueryLanguageMode.JSONPath:
self.input_path = state_props.get(InputPath) or InputPath(
StringJsonPath(JSONPATH_ROOT_PATH)
)
self.output_path = state_props.get(OutputPath) or OutputPath(
StringJsonPath(JSONPATH_ROOT_PATH)
)
self.output = None
# JSONata sub-productions.
else:
self.input_path = None
self.output_path = None
self.output = state_props.get(Output)
def _set_next(self, env: Environment) -> None:
if env.next_state_name != self.name:
# Next was already overridden.
return
if isinstance(self.continue_with, ContinueWithNext):
env.next_state_name = self.continue_with.next_state.name
elif isinstance(
self.continue_with, ContinueWithEnd
): # This includes ContinueWithSuccess
env.set_ended()
else:
LOG.error(
"Could not handle ContinueWith type of '%s'.", type(self.continue_with)
)
def _is_language_query_jsonpath(self) -> bool:
return self.query_language.query_language_mode == QueryLanguageMode.JSONPath
def _get_state_entered_event_details(
self, env: Environment
) -> StateEnteredEventDetails:
return StateEnteredEventDetails(
name=self.name,
input=to_json_str(env.states.get_input(), separators=(",", ":")),
inputDetails=HistoryEventExecutionDataDetails(
truncated=False # Always False for api calls.
),
)
def _get_state_exited_event_details(
self, env: Environment
) -> StateExitedEventDetails:
event_details = StateExitedEventDetails(
name=self.name,
output=to_json_str(env.states.get_input(), separators=(",", ":")),
outputDetails=HistoryEventExecutionDataDetails(
truncated=False # Always False for api calls.
),
)
# TODO add typing when these become available in boto.
assigned_variables = env.variable_store.get_assigned_variables()
env.variable_store.reset_tracing()
if assigned_variables:
event_details["assignedVariables"] = assigned_variables # noqa
event_details["assignedVariablesDetails"] = {"truncated": False} # noqa
return event_details
def _verify_size_quota(self, env: Environment, value: Any) -> None:
is_within: bool = is_within_size_quota(value)
if is_within:
return
error_type = StatesErrorNameType.StatesStatesDataLimitExceeded
cause = (
f"The state/task '{self.name}' returned a result with a size exceeding "
f"the maximum number of bytes service limit."
)
raise FailureEventException(
failure_event=FailureEvent(
env=env,
error_name=StatesErrorName(typ=error_type),
event_type=HistoryEventType.TaskFailed,
event_details=EventDetails(
executionFailedEventDetails=ExecutionFailedEventDetails(
error=error_type.to_name(),
cause=cause,
)
),
)
)
def _eval_state_input(self, env: Environment) -> None:
# Filter the input onto the stack.
if self.input_path:
self.input_path.eval(env)
else:
env.stack.append(env.states.get_input())
@abc.abstractmethod
def _eval_state(self, env: Environment) -> None: ...
def _eval_state_output(self, env: Environment) -> None:
# Process output value as next state input.
if self.output_path:
self.output_path.eval(env=env)
elif self.output:
self.output.eval(env=env)
else:
current_output = env.stack.pop()
env.states.reset(input_value=current_output)
def _eval_body(self, env: Environment) -> None:
env.event_manager.add_event(
context=env.event_history_context,
event_type=self.state_entered_event_type,
event_details=EventDetails(
stateEnteredEventDetails=self._get_state_entered_event_details(env=env)
),
)
env.states.context_object.context_object_data["State"] = StateData(
EnteredTime=datetime.datetime.now(tz=datetime.timezone.utc).isoformat(),
Name=self.name,
)
self._eval_state_input(env=env)
try:
self._eval_state(env)
except NoSuchJsonPathError as no_such_json_path_error:
data_json_str = to_json_str(no_such_json_path_error.data)
cause = (
f"The JSONPath '{no_such_json_path_error.json_path}' specified for the field '{env.next_field_name}' "
f"could not be found in the input '{data_json_str}'"
)
raise FailureEventException(
failure_event=FailureEvent(
env=env,
error_name=StatesErrorName(typ=StatesErrorNameType.StatesRuntime),
event_type=HistoryEventType.TaskFailed,
event_details=EventDetails(
taskFailedEventDetails=TaskFailedEventDetails(
error=StatesErrorNameType.StatesRuntime.to_name(),
cause=cause,
)
),
)
)
if not isinstance(env.program_state(), ProgramRunning):
return
self._eval_state_output(env=env)
self._verify_size_quota(env=env, value=env.states.get_input())
self._set_next(env)
if self.state_exited_event_type is not None:
env.event_manager.add_event(
context=env.event_history_context,
event_type=self.state_exited_event_type,
event_details=EventDetails(
stateExitedEventDetails=self._get_state_exited_event_details(
env=env
),
),
)
|