1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
|
import re
import pytest
from freezegun import freeze_time
from moto.swf.exceptions import SWFWorkflowExecutionClosedError
from moto.swf.models import ActivityTask, ActivityType, Timeout
from ..utils import (
ACTIVITY_TASK_TIMEOUTS,
make_workflow_execution,
process_first_timeout,
)
def test_activity_task_creation():
wfe = make_workflow_execution()
task = ActivityTask(
activity_id="my-activity-123",
activity_type="foo",
workflow_input="optional",
scheduled_event_id=117,
workflow_execution=wfe,
timeouts=ACTIVITY_TASK_TIMEOUTS,
)
assert task.workflow_execution == wfe
assert task.state == "SCHEDULED"
assert re.match("[-a-z0-9]+", task.task_token)
assert task.started_event_id is None
task.start(123)
assert task.state == "STARTED"
assert task.started_event_id == 123
task.complete()
assert task.state == "COMPLETED"
# NB: this doesn't make any sense for SWF, a task shouldn't go from a
# "COMPLETED" state to a "FAILED" one, but this is an internal state on our
# side and we don't care about invalid state transitions for now.
task.fail()
assert task.state == "FAILED"
def test_activity_task_full_dict_representation():
wfe = make_workflow_execution()
at = ActivityTask(
activity_id="my-activity-123",
activity_type=ActivityType("foo", "v1.0"),
workflow_input="optional",
scheduled_event_id=117,
timeouts=ACTIVITY_TASK_TIMEOUTS,
workflow_execution=wfe,
)
at.start(1234)
fd = at.to_full_dict()
assert fd["activityId"] == "my-activity-123"
assert fd["activityType"]["version"] == "v1.0"
assert fd["input"] == "optional"
assert fd["startedEventId"] == 1234
assert "taskToken" in fd
assert fd["workflowExecution"] == wfe.to_short_dict()
at.start(1234)
fd = at.to_full_dict()
assert fd["startedEventId"] == 1234
def test_activity_task_reset_heartbeat_clock():
wfe = make_workflow_execution()
with freeze_time("2015-01-01 12:00:00"):
task = ActivityTask(
activity_id="my-activity-123",
activity_type="foo",
workflow_input="optional",
scheduled_event_id=117,
timeouts=ACTIVITY_TASK_TIMEOUTS,
workflow_execution=wfe,
)
assert task.last_heartbeat_timestamp == 1420113600.0
with freeze_time("2015-01-01 13:00:00"):
task.reset_heartbeat_clock()
assert task.last_heartbeat_timestamp == 1420117200.0
def test_activity_task_first_timeout():
wfe = make_workflow_execution()
with freeze_time("2015-01-01 12:00:00"):
task = ActivityTask(
activity_id="my-activity-123",
activity_type="foo",
workflow_input="optional",
scheduled_event_id=117,
timeouts=ACTIVITY_TASK_TIMEOUTS,
workflow_execution=wfe,
)
assert task.first_timeout() is None
# activity task timeout is 300s == 5mins
with freeze_time("2015-01-01 12:06:00"):
assert isinstance(task.first_timeout(), Timeout)
process_first_timeout(task)
assert task.state == "TIMED_OUT"
assert task.timeout_type == "HEARTBEAT"
def test_activity_task_first_timeout_with_heartbeat_timeout_none():
wfe = make_workflow_execution()
activity_task_timeouts = ACTIVITY_TASK_TIMEOUTS.copy()
activity_task_timeouts["heartbeatTimeout"] = "NONE"
with freeze_time("2015-01-01 12:00:00"):
task = ActivityTask(
activity_id="my-activity-123",
activity_type="foo",
workflow_input="optional",
scheduled_event_id=117,
timeouts=activity_task_timeouts,
workflow_execution=wfe,
)
assert task.first_timeout() is None
def test_activity_task_cannot_timeout_on_closed_workflow_execution():
with freeze_time("2015-01-01 12:00:00"):
wfe = make_workflow_execution()
wfe.start()
with freeze_time("2015-01-01 13:58:00"):
task = ActivityTask(
activity_id="my-activity-123",
activity_type="foo",
workflow_input="optional",
scheduled_event_id=117,
timeouts=ACTIVITY_TASK_TIMEOUTS,
workflow_execution=wfe,
)
with freeze_time("2015-01-01 14:10:00"):
assert isinstance(task.first_timeout(), Timeout)
assert isinstance(wfe.first_timeout(), Timeout)
process_first_timeout(wfe)
assert task.first_timeout() is None
def test_activity_task_cannot_change_state_on_closed_workflow_execution():
wfe = make_workflow_execution()
wfe.start()
task = ActivityTask(
activity_id="my-activity-123",
activity_type="foo",
workflow_input="optional",
scheduled_event_id=117,
timeouts=ACTIVITY_TASK_TIMEOUTS,
workflow_execution=wfe,
)
wfe.complete(123)
with pytest.raises(SWFWorkflowExecutionClosedError):
task.timeout(Timeout(task, 0, "foo"))
with pytest.raises(SWFWorkflowExecutionClosedError):
task.complete()
with pytest.raises(SWFWorkflowExecutionClosedError):
task.fail()
|