1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
import json
from functools import wraps
from time import sleep
from typing import TYPE_CHECKING, Callable, TypeVar
from uuid import uuid4
import boto3
import requests
from moto import mock_aws, settings
from tests import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from tests import allow_aws_request
from tests.test_stepfunctions.parser.templates.templates import load_template
if TYPE_CHECKING:
from typing_extensions import ParamSpec
P = ParamSpec("P")
T = TypeVar("T")
def aws_verified(func):
"""
Function that is verified to work against AWS.
Can be run against AWS at any time by setting:
MOTO_TEST_ALLOW_AWS_REQUEST=true
If this environment variable is not set, the function runs in a `mock_aws` context.
"""
@wraps(func)
def pagination_wrapper():
if allow_aws_request():
return func()
else:
with mock_aws():
requests.post(
f"http://{base_url}/moto-api/config",
json={"stepfunctions": {"execute_state_machine": True}},
)
resp = func()
requests.post(
f"http://{base_url}/moto-api/config",
json={"stepfunctions": {"execute_state_machine": False}},
)
return resp
return pagination_wrapper
def verify_execution_result(
_verify_result, expected_status, tmpl_name, exec_input=None, sleep_time=0
):
iam = boto3.client("iam", region_name="us-east-1")
role_name = f"sfn_role_{str(uuid4())[0:6]}"
sfn_role = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(sfn_role_policy),
Path="/",
)["Role"]["Arn"]
iam.put_role_policy(
PolicyDocument=json.dumps(sfn_allow_dynamodb),
PolicyName="allowLambdaInvoke",
RoleName=role_name,
)
sleep(sleep_time)
client = boto3.client("stepfunctions", region_name="us-east-1")
execution_arn, state_machine_arn = _start_execution(
client, load_template(tmpl_name), exec_input, sfn_role
)
for _ in range(30):
execution = client.describe_execution(executionArn=execution_arn)
if expected_status is None or execution["status"] == expected_status:
try:
result = _verify_result(client, execution, execution_arn)
if result is not False:
_teardown(client, iam, role_name, state_machine_arn)
break
except Exception:
_teardown(client, iam, role_name, state_machine_arn)
raise
sleep(10 if allow_aws_request() else 0.1)
else:
_teardown(client, iam, role_name, state_machine_arn)
raise AssertionError("Should have failed already")
def _teardown(client, iam, role_name, state_machine_arn):
# Stop/Cancel any existing executions, as deletion will only occur after all executions have finished
execs = client.list_executions(stateMachineArn=state_machine_arn)["executions"]
for exec in execs:
if exec["status"] == "RUNNING":
try:
client.stop_execution(executionArn=exec["executionArn"])
except Exception as e:
# list_executions sometimes returns an execution as RUNNING,even though it has already failed
# Cancelling it again will fail, but we should not stop everything
# Just log it for visibility
print(f"Unable to stop 'running' execution in {state_machine_arn}") # noqa
print(e) # noqa
# Delete actual resources
client.delete_state_machine(stateMachineArn=state_machine_arn)
iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke")
iam.delete_role(RoleName=role_name)
def _start_execution(client, definition, exec_input, sfn_role):
name = f"sfn_name_{str(uuid4())[0:6]}"
#
response = client.create_state_machine(
name=name, definition=json.dumps(definition), roleArn=sfn_role
)
state_machine_arn = response["stateMachineArn"]
if exec_input:
execution = client.start_execution(
name="exec1", stateMachineArn=state_machine_arn, input=exec_input or "{}"
)
else:
execution = client.start_execution(
name="exec1", stateMachineArn=state_machine_arn
)
execution_arn = execution["executionArn"]
return execution_arn, state_machine_arn
def _get_default_role():
return "arn:aws:iam::" + ACCOUNT_ID + ":role/unknown_sf_role"
base_url = "localhost:5000" if settings.TEST_SERVER_MODE else "motoapi.amazonaws.com"
def enable_sfn_parser(func: "Callable[P, T]") -> "Callable[P, T]":
@wraps(func)
def pagination_wrapper(*args: "P.args", **kwargs: "P.kwargs") -> T: # type: ignore
requests.post(
f"http://{base_url}/moto-api/config",
json={"stepfunctions": {"execute_state_machine": True}},
)
try:
res = func(*args, **kwargs)
finally:
requests.post(
f"http://{base_url}/moto-api/config",
json={"stepfunctions": {"execute_state_machine": False}},
)
return res
return pagination_wrapper
sfn_role_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "states.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
sfn_allow_lambda_invoke = {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": ["lambda:InvokeFunction"], "Resource": ["*"]}
],
}
sfn_allow_dynamodb = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Action": ["dynamodb:*"], "Resource": ["*"]}],
}
|