File: __init__.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (175 lines) | stat: -rw-r--r-- 5,824 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import json
from functools import wraps
from time import sleep
from typing import TYPE_CHECKING, Callable, TypeVar
from uuid import uuid4

import boto3
import requests

from moto import mock_aws, settings
from tests import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from tests import allow_aws_request
from tests.test_stepfunctions.parser.templates.templates import load_template

if TYPE_CHECKING:
    from typing_extensions import ParamSpec

    P = ParamSpec("P")

T = TypeVar("T")


def aws_verified(func):
    """
    Function that is verified to work against AWS.
    Can be run against AWS at any time by setting:
      MOTO_TEST_ALLOW_AWS_REQUEST=true

    If this environment variable is not set, the function runs in a `mock_aws` context.
    """

    @wraps(func)
    def pagination_wrapper():
        if allow_aws_request():
            return func()
        else:
            with mock_aws():
                requests.post(
                    f"http://{base_url}/moto-api/config",
                    json={"stepfunctions": {"execute_state_machine": True}},
                )
                resp = func()
                requests.post(
                    f"http://{base_url}/moto-api/config",
                    json={"stepfunctions": {"execute_state_machine": False}},
                )
                return resp

    return pagination_wrapper


def verify_execution_result(
    _verify_result, expected_status, tmpl_name, exec_input=None, sleep_time=0
):
    iam = boto3.client("iam", region_name="us-east-1")
    role_name = f"sfn_role_{str(uuid4())[0:6]}"
    sfn_role = iam.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=json.dumps(sfn_role_policy),
        Path="/",
    )["Role"]["Arn"]
    iam.put_role_policy(
        PolicyDocument=json.dumps(sfn_allow_dynamodb),
        PolicyName="allowLambdaInvoke",
        RoleName=role_name,
    )
    sleep(sleep_time)

    client = boto3.client("stepfunctions", region_name="us-east-1")
    execution_arn, state_machine_arn = _start_execution(
        client, load_template(tmpl_name), exec_input, sfn_role
    )
    for _ in range(30):
        execution = client.describe_execution(executionArn=execution_arn)
        if expected_status is None or execution["status"] == expected_status:
            try:
                result = _verify_result(client, execution, execution_arn)
                if result is not False:
                    _teardown(client, iam, role_name, state_machine_arn)
                    break
            except Exception:
                _teardown(client, iam, role_name, state_machine_arn)
                raise
        sleep(10 if allow_aws_request() else 0.1)
    else:
        _teardown(client, iam, role_name, state_machine_arn)
        raise AssertionError("Should have failed already")


def _teardown(client, iam, role_name, state_machine_arn):
    # Stop/Cancel any existing executions, as deletion will only occur after all executions have finished
    execs = client.list_executions(stateMachineArn=state_machine_arn)["executions"]
    for exec in execs:
        if exec["status"] == "RUNNING":
            try:
                client.stop_execution(executionArn=exec["executionArn"])
            except Exception as e:
                # list_executions sometimes returns an execution as RUNNING,even though it has already failed
                # Cancelling it again will fail, but we should not stop everything
                # Just log it for visibility
                print(f"Unable to stop 'running' execution in {state_machine_arn}")  # noqa
                print(e)  # noqa
    # Delete actual resources
    client.delete_state_machine(stateMachineArn=state_machine_arn)
    iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke")
    iam.delete_role(RoleName=role_name)


def _start_execution(client, definition, exec_input, sfn_role):
    name = f"sfn_name_{str(uuid4())[0:6]}"
    #
    response = client.create_state_machine(
        name=name, definition=json.dumps(definition), roleArn=sfn_role
    )
    state_machine_arn = response["stateMachineArn"]
    if exec_input:
        execution = client.start_execution(
            name="exec1", stateMachineArn=state_machine_arn, input=exec_input or "{}"
        )
    else:
        execution = client.start_execution(
            name="exec1", stateMachineArn=state_machine_arn
        )
    execution_arn = execution["executionArn"]
    return execution_arn, state_machine_arn


def _get_default_role():
    return "arn:aws:iam::" + ACCOUNT_ID + ":role/unknown_sf_role"


base_url = "localhost:5000" if settings.TEST_SERVER_MODE else "motoapi.amazonaws.com"


def enable_sfn_parser(func: "Callable[P, T]") -> "Callable[P, T]":
    @wraps(func)
    def pagination_wrapper(*args: "P.args", **kwargs: "P.kwargs") -> T:  # type: ignore
        requests.post(
            f"http://{base_url}/moto-api/config",
            json={"stepfunctions": {"execute_state_machine": True}},
        )
        try:
            res = func(*args, **kwargs)
        finally:
            requests.post(
                f"http://{base_url}/moto-api/config",
                json={"stepfunctions": {"execute_state_machine": False}},
            )
        return res

    return pagination_wrapper


sfn_role_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {"Service": "states.amazonaws.com"},
            "Action": "sts:AssumeRole",
        }
    ],
}

sfn_allow_lambda_invoke = {
    "Version": "2012-10-17",
    "Statement": [
        {"Effect": "Allow", "Action": ["lambda:InvokeFunction"], "Resource": ["*"]}
    ],
}

sfn_allow_dynamodb = {
    "Version": "2012-10-17",
    "Statement": [{"Effect": "Allow", "Action": ["dynamodb:*"], "Resource": ["*"]}],
}