1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
from typing import Final, Optional
from moto.stepfunctions.parser.api import HistoryEventType, TaskFailedEventDetails
from moto.stepfunctions.parser.asl.component.common.error_name.failure_event import (
FailureEvent,
FailureEventException,
)
from moto.stepfunctions.parser.asl.component.common.error_name.states_error_name import (
StatesErrorName,
)
from moto.stepfunctions.parser.asl.component.common.error_name.states_error_name_type import (
StatesErrorNameType,
)
from moto.stepfunctions.parser.asl.component.common.string.string_expression import (
StringSampler,
)
from moto.stepfunctions.parser.asl.component.eval_component import EvalComponent
from moto.stepfunctions.parser.asl.eval.environment import Environment
from moto.stepfunctions.parser.asl.eval.event.event_detail import EventDetails
from moto.stepfunctions.parser.asl.utils.json_path import NoSuchJsonPathError
class OutputPath(EvalComponent):
string_sampler: Final[Optional[StringSampler]]
def __init__(self, string_sampler: Optional[StringSampler]):
self.string_sampler = string_sampler
def _eval_body(self, env: Environment) -> None:
if self.string_sampler is None:
env.states.reset(input_value={})
return
try:
self.string_sampler.eval(env=env)
except NoSuchJsonPathError as no_such_json_path_error:
json_path = no_such_json_path_error.json_path
cause = f"Invalid path '{json_path}' : No results for path: $['{json_path[2:]}']"
raise FailureEventException(
failure_event=FailureEvent(
env=env,
error_name=StatesErrorName(typ=StatesErrorNameType.StatesRuntime),
event_type=HistoryEventType.TaskFailed,
event_details=EventDetails(
taskFailedEventDetails=TaskFailedEventDetails(
error=StatesErrorNameType.StatesRuntime.to_name(),
cause=cause,
)
),
)
)
output_value = env.stack.pop()
env.states.reset(output_value)
|