File: test_stepfunctions_idempotency.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (78 lines) | stat: -rw-r--r-- 3,062 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import os
from unittest import SkipTest, mock

import pytest
from botocore.exceptions import ClientError

from moto import mock_aws, settings
from moto.stepfunctions.models import StepFunctionBackend, stepfunctions_backends
from tests import DEFAULT_ACCOUNT_ID

from .parser import (
    verify_execution_result,
)


@mock_aws
@mock.patch.dict(os.environ, {"SF_EXECUTION_HISTORY_TYPE": "FAILURE"})
def test_create_state_machine_twice_after_failure():
    if settings.TEST_SERVER_MODE:
        raise SkipTest("Don't need to test this in ServerMode")

    def _verify_result(client, execution, execution_arn):
        name = execution["name"]
        arn = execution["stateMachineArn"]
        execution_arn = execution["executionArn"]

        # Execution fails if we re-start it after failure
        with pytest.raises(ClientError) as exc:
            client.start_execution(name=name, stateMachineArn=arn)
        err = exc.value.response["Error"]
        assert err["Code"] == "ExecutionAlreadyExists"
        assert err["Message"] == f"Execution Already Exists: '{execution_arn}'"

    verify_execution_result(_verify_result, "FAILED", "failure")


@mock_aws
def test_create_state_machine_twice_after_success():
    if settings.TEST_SERVER_MODE:
        raise SkipTest("Don't need to test this in ServerMode")

    def _verify_result(client, execution, execution_arn):
        name = execution["name"]
        arn = execution["stateMachineArn"]
        execution_arn = execution["executionArn"]

        if execution["status"] == "RUNNING":
            # We can start the execution just fine
            idempotent = client.start_execution(name=name, stateMachineArn=arn)
            assert idempotent["executionArn"] == execution_arn

            # Manually mark our execution as finished
            # (Because we don't actually execute anything, the status is always 'RUNNING' if we don't do this manually)
            backend: StepFunctionBackend = stepfunctions_backends[DEFAULT_ACCOUNT_ID][
                "us-east-1"
            ]
            machine = backend.describe_state_machine(arn)
            execution = next(
                (x for x in machine.executions if x.execution_arn == execution_arn),
                None,
            )
            execution.status = "SUCCEEDED"

            # We're not done yet - we should check in on the progress later
            return False
        elif execution["status"] == "SUCCEEDED":
            # Execution fails if we re-start it after it finishes
            with pytest.raises(ClientError) as exc:
                client.start_execution(name=name, stateMachineArn=arn)
            err = exc.value.response["Error"]
            assert err["Code"] == "ExecutionAlreadyExists"
            assert err["Message"] == f"Execution Already Exists: '{execution_arn}'"

            # Execution finished, and we verified our error exception
            # Return True to indicate we can tear down our StateMachine
            return True

    verify_execution_result(_verify_result, None, tmpl_name="wait_1")