File: test_stepfunctions_lambda_integration.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (322 lines) | stat: -rw-r--r-- 11,493 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
import json
import os
from functools import wraps
from time import sleep
from uuid import uuid4

import boto3
import pytest
import requests
from botocore.exceptions import ClientError

from moto import mock_aws
from tests.test_awslambda.utilities import (
    get_test_zip_file1,
)

from ...markers import requires_docker
from . import base_url, sfn_allow_lambda_invoke, sfn_role_policy
from .templates.comments.comments_as_per_docs import templ as comments_templ

lambda_state_machine = {
    "Comment": "A simple AWS Step Functions state machine that calls two Lambda Functions",
    "StartAt": "Open Case",
    "States": {
        "Open Case": {"Type": "Task", "Resource": "TODO", "Next": "Send Email"},
        "Send Email": {"Type": "Task", "Resource": "TODO", "End": True},
    },
}


def aws_verified(func):
    """
    Function that is verified to work against AWS.
    Can be run against AWS at any time by setting:
      MOTO_TEST_ALLOW_AWS_REQUEST=true

    If this environment variable is not set, the function runs in a `mock_aws` context.

    This decorator will:
      - Create an IAM-role that can be used by AWSLambda functions table
      - Run the test
      - Delete the role
    """

    @wraps(func)
    def pagination_wrapper():
        role_name = "moto_test_role_" + str(uuid4())[0:6]

        allow_aws_request = (
            os.environ.get("MOTO_TEST_ALLOW_AWS_REQUEST", "false").lower() == "true"
        )

        if allow_aws_request:
            return create_role_and_test(role_name, sleep_time=10)
        else:
            with mock_aws():
                requests.post(
                    f"http://{base_url}/moto-api/config",
                    json={"stepfunctions": {"execute_state_machine": True}},
                )
                resp = create_role_and_test(role_name, sleep_time=0)
                requests.post(
                    f"http://{base_url}/moto-api/config",
                    json={"stepfunctions": {"execute_state_machine": False}},
                )
                return resp

    def create_role_and_test(role_name, sleep_time):
        policy_doc = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "LambdaAssumeRole",
                    "Effect": "Allow",
                    "Principal": {"Service": "lambda.amazonaws.com"},
                    "Action": "sts:AssumeRole",
                }
            ],
        }
        iam = boto3.client("iam", region_name="us-east-1")
        iam_role_arn = iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(policy_doc),
            Path="/",
        )["Role"]["Arn"]

        fn_name = f"fn_for_{role_name}"
        try:
            fn_arn = create_function(iam_role_arn, role_name, fn_name)

            resp = func(fn_name, fn_arn, sleep_time)
        finally:
            _lambda = boto3.client("lambda", "us-east-1")
            _lambda.delete_function(FunctionName=fn_name)

            iam.delete_role(RoleName=role_name)

        return resp

    def create_function(role_arn: str, role_name: str, fn_name: str):
        iam = boto3.client("iam", region_name="us-east-1")

        # The waiter is normally used to wait until a resource is ready
        wait_for_role = iam.get_waiter("role_exists")
        wait_for_role.wait(RoleName=role_name)

        # HOWEVER:
        # Just because the role exists, doesn't mean it can be used by Lambda
        # The only reliable way to check if our role is ready:
        #   - try to create a function and see if it succeeds
        #
        # The alternative is to wait between 5 and 10 seconds, which is not a great solution either
        _lambda = boto3.client("lambda", "us-east-1")
        for _ in range(15):
            try:
                fn = _lambda.create_function(
                    FunctionName=fn_name,
                    Runtime="python3.11",
                    Role=role_arn,
                    Handler="lambda_function.lambda_handler",
                    Code={"ZipFile": get_test_zip_file1()},
                )
                return fn["FunctionArn"]

            except ClientError:
                sleep(1)
        raise Exception(
            f"Couldn't create test Lambda FN using IAM role {role_name}, probably because it wasn't ready in time"
        )

    return pagination_wrapper


@aws_verified
@pytest.mark.aws_verified
@pytest.mark.network
@requires_docker
def test_state_machine_calling_lambda_fn(fn_name=None, fn_arn=None, sleep_time=0):
    definition = lambda_state_machine.copy()
    definition["States"]["Open Case"]["Resource"] = fn_arn
    definition["States"]["Send Email"]["Resource"] = fn_arn

    iam = boto3.client("iam", region_name="us-east-1")
    role_name = f"sfn_role_{str(uuid4())[0:6]}"
    sfn_role = iam.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=json.dumps(sfn_role_policy),
        Path="/",
    )["Role"]["Arn"]
    iam.put_role_policy(
        PolicyDocument=json.dumps(sfn_allow_lambda_invoke),
        PolicyName="allowLambdaInvoke",
        RoleName=role_name,
    )
    sleep(sleep_time)

    client = boto3.client("stepfunctions", region_name="us-east-1")
    name = "sfn_" + str(uuid4())[0:6]
    exec_input = {"my": "input"}
    #
    response = client.create_state_machine(
        name=name, definition=json.dumps(definition), roleArn=sfn_role
    )
    state_machine_arn = response["stateMachineArn"]

    execution = client.start_execution(
        name="exec1", stateMachineArn=state_machine_arn, input=json.dumps(exec_input)
    )
    execution_arn = execution["executionArn"]

    for _ in range(10):
        execution = client.describe_execution(executionArn=execution_arn)
        if execution["status"] == "SUCCEEDED":
            assert "stopDate" in execution
            assert json.loads(execution["input"]) == exec_input
            assert json.loads(execution["output"]) == exec_input

            history = client.get_execution_history(executionArn=execution_arn)
            assert len(history["events"]) == 12
            assert [e["type"] for e in history["events"]] == [
                "ExecutionStarted",
                "TaskStateEntered",
                "LambdaFunctionScheduled",
                "LambdaFunctionStarted",
                "LambdaFunctionSucceeded",
                "TaskStateExited",
                "TaskStateEntered",
                "LambdaFunctionScheduled",
                "LambdaFunctionStarted",
                "LambdaFunctionSucceeded",
                "TaskStateExited",
                "ExecutionSucceeded",
            ]

            client.delete_state_machine(stateMachineArn=state_machine_arn)
            iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke")
            iam.delete_role(RoleName=role_name)
            break
        sleep(1)
    else:
        client.delete_state_machine(stateMachineArn=state_machine_arn)
        iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke")
        iam.delete_role(RoleName=role_name)
        raise AssertionError("Should have failed already")


@aws_verified
@pytest.mark.aws_verified
@pytest.mark.network
@requires_docker
def test_state_machine_calling_failing_lambda_fn(
    fn_name=None, fn_arn=None, sleep_time=0
):
    definition = lambda_state_machine.copy()
    definition["States"]["Open Case"]["Resource"] = fn_arn
    definition["States"]["Send Email"]["Resource"] = fn_arn

    iam = boto3.client("iam", region_name="us-east-1")
    role_name = f"sfn_role_{str(uuid4())[0:6]}"
    sfn_role = iam.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=json.dumps(sfn_role_policy),
        Path="/",
    )["Role"]["Arn"]
    iam.put_role_policy(
        PolicyDocument=json.dumps(sfn_allow_lambda_invoke),
        PolicyName="allowLambdaInvoke",
        RoleName=role_name,
    )
    sleep(sleep_time)

    client = boto3.client("stepfunctions", region_name="us-east-1")
    name = "sfn_" + str(uuid4())[0:6]
    exec_input = {"my": "input", "error": True}
    #
    response = client.create_state_machine(
        name=name, definition=json.dumps(definition), roleArn=sfn_role
    )
    state_machine_arn = response["stateMachineArn"]

    execution = client.start_execution(
        name="exec1", stateMachineArn=state_machine_arn, input=json.dumps(exec_input)
    )
    execution_arn = execution["executionArn"]

    for _ in range(10):
        execution = client.describe_execution(executionArn=execution_arn)
        if execution["status"] == "FAILED":
            assert json.loads(execution["cause"])["errorMessage"] == "I failed!"

            history = client.get_execution_history(executionArn=execution_arn)
            assert len(history["events"]) == 6
            assert [e["type"] for e in history["events"]] == [
                "ExecutionStarted",
                "TaskStateEntered",
                "LambdaFunctionScheduled",
                "LambdaFunctionStarted",
                "LambdaFunctionFailed",
                "ExecutionFailed",
            ]

            client.delete_state_machine(stateMachineArn=state_machine_arn)
            iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke")
            iam.delete_role(RoleName=role_name)
            break
        sleep(1)
    else:
        client.delete_state_machine(stateMachineArn=state_machine_arn)
        iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke")
        iam.delete_role(RoleName=role_name)
        raise AssertionError("Should have failed already")


@aws_verified
@pytest.mark.aws_verified
@pytest.mark.network
@requires_docker
def test_comments_on_lmb_function(fn_name=None, fn_arn=None, sleep_time=0):
    definition = comments_templ.copy()
    definition["States"]["TaskStateCatchRetry"]["Resource"] = fn_arn

    iam = boto3.client("iam", region_name="us-east-1")
    role_name = f"sfn_role_{str(uuid4())[0:6]}"
    sfn_role = iam.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=json.dumps(sfn_role_policy),
        Path="/",
    )["Role"]["Arn"]
    iam.put_role_policy(
        PolicyDocument=json.dumps(sfn_allow_lambda_invoke),
        PolicyName="allowLambdaInvoke",
        RoleName=role_name,
    )
    sleep(sleep_time)

    client = boto3.client("stepfunctions", region_name="us-east-1")
    name = "sfn_" + str(uuid4())[0:6]
    exec_input = {}
    #
    response = client.create_state_machine(
        name=name, definition=json.dumps(definition), roleArn=sfn_role
    )
    state_machine_arn = response["stateMachineArn"]

    execution = client.start_execution(
        name="exec1", stateMachineArn=state_machine_arn, input=json.dumps(exec_input)
    )
    execution_arn = execution["executionArn"]

    for _ in range(15):
        execution = client.describe_execution(executionArn=execution_arn)
        if execution["status"] == "SUCCEEDED":
            client.delete_state_machine(stateMachineArn=state_machine_arn)
            iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke")
            iam.delete_role(RoleName=role_name)
            break
        sleep(1)
    else:
        client.delete_state_machine(stateMachineArn=state_machine_arn)
        iam.delete_role_policy(RoleName=role_name, PolicyName="allowLambdaInvoke")
        iam.delete_role(RoleName=role_name)
        raise AssertionError("Should have failed already")