File: state_parallel.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (107 lines) | stat: -rw-r--r-- 4,441 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import copy
from typing import Optional

from moto.stepfunctions.parser.api import HistoryEventType
from moto.stepfunctions.parser.asl.component.common.catch.catch_outcome import (
    CatchOutcome,
)
from moto.stepfunctions.parser.asl.component.common.error_name.failure_event import (
    FailureEvent,
    FailureEventException,
)
from moto.stepfunctions.parser.asl.component.common.parargs import Parargs
from moto.stepfunctions.parser.asl.component.common.retry.retry_outcome import (
    RetryOutcome,
)
from moto.stepfunctions.parser.asl.component.state.exec.execute_state import (
    ExecutionState,
)
from moto.stepfunctions.parser.asl.component.state.exec.state_parallel.branches_decl import (
    BranchesDecl,
)
from moto.stepfunctions.parser.asl.component.state.state_props import StateProps
from moto.stepfunctions.parser.asl.eval.environment import Environment


class StateParallel(ExecutionState):
    # Branches (Required)
    # An array of objects that specify state machines to execute in state_parallel. Each such state
    # machine object must have fields named States and StartAt, whose meanings are exactly
    # like those in the top level of a state machine.
    branches: BranchesDecl
    parargs: Optional[Parargs]

    def __init__(self):
        super().__init__(
            state_entered_event_type=HistoryEventType.ParallelStateEntered,
            state_exited_event_type=HistoryEventType.ParallelStateExited,
        )

    def from_state_props(self, state_props: StateProps) -> None:
        super().from_state_props(state_props)
        self.branches = state_props.get(
            typ=BranchesDecl,
            raise_on_missing=ValueError(
                f"Missing Branches definition in props '{state_props}'."
            ),
        )
        self.parargs = state_props.get(Parargs)

    def _eval_execution(self, env: Environment) -> None:
        env.event_manager.add_event(
            context=env.event_history_context,
            event_type=HistoryEventType.ParallelStateStarted,
        )
        self.branches.eval(env)
        env.event_manager.add_event(
            context=env.event_history_context,
            event_type=HistoryEventType.ParallelStateSucceeded,
            update_source_event_id=False,
        )

    def _eval_state(self, env: Environment) -> None:
        # Initialise the retry counter for execution states.
        env.states.context_object.context_object_data["State"]["RetryCount"] = 0

        # Compute the branches' input: if declared this is the parameters, else the current memory state.
        if self.parargs is not None:
            self.parargs.eval(env=env)
        # In both cases, the inputs are copied by value to the branches, to avoid cross branch state manipulation, and
        # cached to allow them to be resubmitted in case of failure.
        input_value = copy.deepcopy(env.stack.pop())

        # Attempt to evaluate the state's logic through until it's successful, caught, or retries have run out.
        while env.is_running():
            try:
                env.stack.append(input_value)
                self._evaluate_with_timeout(env)
                break
            except FailureEventException as failure_event_ex:
                failure_event: FailureEvent = self._from_error(
                    env=env, ex=failure_event_ex
                )
                error_output = self._construct_error_output_value(
                    failure_event=failure_event
                )
                env.states.set_error_output(error_output)
                env.states.set_result(error_output)

                if self.retry is not None:
                    retry_outcome: RetryOutcome = self._handle_retry(
                        env=env, failure_event=failure_event
                    )
                    if retry_outcome == RetryOutcome.CanRetry:
                        continue

                env.event_manager.add_event(
                    context=env.event_history_context,
                    event_type=HistoryEventType.ParallelStateFailed,
                )

                if self.catch is not None:
                    self._handle_catch(env=env, failure_event=failure_event)
                    catch_outcome: CatchOutcome = env.stack[-1]
                    if catch_outcome == CatchOutcome.Caught:
                        break

                self._handle_uncaught(env=env, failure_event=failure_event)