1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
import copy
from typing import Final, Optional
from jsonpath_ng import parse
from moto.stepfunctions.parser.asl.component.eval_component import EvalComponent
from moto.stepfunctions.parser.asl.eval.environment import Environment
class ResultPath(EvalComponent):
DEFAULT_PATH: Final[str] = "$"
result_path_src: Final[Optional[str]]
def __init__(self, result_path_src: Optional[str]):
self.result_path_src = result_path_src
def _eval_body(self, env: Environment) -> None:
state_input = env.states.get_input()
# Discard task output if there is one, and set the output ot be the state's input.
if self.result_path_src is None:
env.stack.clear()
env.stack.append(state_input)
return
# Transform the output with the input.
current_output = env.stack.pop()
result_expr = parse(self.result_path_src)
state_output = result_expr.update_or_create(
state_input, copy.deepcopy(current_output)
)
env.stack.append(state_output)
|