1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
import json
from unittest import SkipTest
from uuid import uuid4
import boto3
from moto import mock_aws, settings
from . import verify_execution_result
@mock_aws(config={"stepfunctions": {"execute_state_machine": True}})
def test_state_machine_calling_sqs_with_heartbeat():
if not settings.TEST_DECORATOR_MODE:
raise SkipTest("No point in testing this in ServerMode")
sqs = boto3.client("sqs", "us-east-1")
sfn = boto3.client("stepfunctions", "us-east-1")
queue_name = f"queue{uuid4()}"
queue_url = sqs.create_queue(QueueName=queue_name)["QueueUrl"]
message_txt = "test_message_txt"
exec_input = {"QueueUrl": queue_url, "Message": message_txt}
expected_status = "RUNNING"
tmpl_name = "services/sqs_heartbeat"
def _verify_result(client, execution, execution_arn):
resp = sqs.receive_message(QueueUrl=queue_url)
if "Messages" in resp:
task_token = json.loads(resp["Messages"][0]["Body"])["TaskToken"]
sfn.send_task_heartbeat(taskToken=task_token)
sfn.send_task_failure(taskToken=task_token)
return True
return False
verify_execution_result(
_verify_result, expected_status, tmpl_name, exec_input=json.dumps(exec_input)
)
@mock_aws(config={"stepfunctions": {"execute_state_machine": True}})
def test_state_machine_calling_sqs_with_task_success():
if not settings.TEST_DECORATOR_MODE:
raise SkipTest("No point in testing this in ServerMode")
sqs = boto3.client("sqs", "us-east-1")
sfn = boto3.client("stepfunctions", "us-east-1")
queue_name = f"queue{uuid4()}"
queue_url = sqs.create_queue(QueueName=queue_name)["QueueUrl"]
message_txt = "test_message_txt"
exec_input = {"QueueUrl": queue_url, "Message": message_txt}
expected_status = "RUNNING"
tmpl_name = "services/sqs_heartbeat"
def _verify_result(client, execution, execution_arn):
resp = sqs.receive_message(QueueUrl=queue_url)
if "Messages" in resp:
task_token = json.loads(resp["Messages"][0]["Body"])["TaskToken"]
sfn.send_task_success(taskToken=task_token, output="it's done")
return True
return False
verify_execution_result(
_verify_result, expected_status, tmpl_name, exec_input=json.dumps(exec_input)
)
|