File: test_activity_task.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (172 lines) | stat: -rw-r--r-- 5,237 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import re

import pytest
from freezegun import freeze_time

from moto.swf.exceptions import SWFWorkflowExecutionClosedError
from moto.swf.models import ActivityTask, ActivityType, Timeout

from ..utils import (
    ACTIVITY_TASK_TIMEOUTS,
    make_workflow_execution,
    process_first_timeout,
)


def test_activity_task_creation():
    wfe = make_workflow_execution()
    task = ActivityTask(
        activity_id="my-activity-123",
        activity_type="foo",
        workflow_input="optional",
        scheduled_event_id=117,
        workflow_execution=wfe,
        timeouts=ACTIVITY_TASK_TIMEOUTS,
    )
    assert task.workflow_execution == wfe
    assert task.state == "SCHEDULED"
    assert re.match("[-a-z0-9]+", task.task_token)
    assert task.started_event_id is None

    task.start(123)
    assert task.state == "STARTED"
    assert task.started_event_id == 123

    task.complete()
    assert task.state == "COMPLETED"

    # NB: this doesn't make any sense for SWF, a task shouldn't go from a
    # "COMPLETED" state to a "FAILED" one, but this is an internal state on our
    # side and we don't care about invalid state transitions for now.
    task.fail()
    assert task.state == "FAILED"


def test_activity_task_full_dict_representation():
    wfe = make_workflow_execution()
    at = ActivityTask(
        activity_id="my-activity-123",
        activity_type=ActivityType("foo", "v1.0"),
        workflow_input="optional",
        scheduled_event_id=117,
        timeouts=ACTIVITY_TASK_TIMEOUTS,
        workflow_execution=wfe,
    )
    at.start(1234)

    fd = at.to_full_dict()
    assert fd["activityId"] == "my-activity-123"
    assert fd["activityType"]["version"] == "v1.0"
    assert fd["input"] == "optional"
    assert fd["startedEventId"] == 1234
    assert "taskToken" in fd
    assert fd["workflowExecution"] == wfe.to_short_dict()

    at.start(1234)
    fd = at.to_full_dict()
    assert fd["startedEventId"] == 1234


def test_activity_task_reset_heartbeat_clock():
    wfe = make_workflow_execution()

    with freeze_time("2015-01-01 12:00:00"):
        task = ActivityTask(
            activity_id="my-activity-123",
            activity_type="foo",
            workflow_input="optional",
            scheduled_event_id=117,
            timeouts=ACTIVITY_TASK_TIMEOUTS,
            workflow_execution=wfe,
        )

    assert task.last_heartbeat_timestamp == 1420113600.0

    with freeze_time("2015-01-01 13:00:00"):
        task.reset_heartbeat_clock()

    assert task.last_heartbeat_timestamp == 1420117200.0


def test_activity_task_first_timeout():
    wfe = make_workflow_execution()

    with freeze_time("2015-01-01 12:00:00"):
        task = ActivityTask(
            activity_id="my-activity-123",
            activity_type="foo",
            workflow_input="optional",
            scheduled_event_id=117,
            timeouts=ACTIVITY_TASK_TIMEOUTS,
            workflow_execution=wfe,
        )
        assert task.first_timeout() is None

    # activity task timeout is 300s == 5mins
    with freeze_time("2015-01-01 12:06:00"):
        assert isinstance(task.first_timeout(), Timeout)
        process_first_timeout(task)
        assert task.state == "TIMED_OUT"
        assert task.timeout_type == "HEARTBEAT"


def test_activity_task_first_timeout_with_heartbeat_timeout_none():
    wfe = make_workflow_execution()

    activity_task_timeouts = ACTIVITY_TASK_TIMEOUTS.copy()
    activity_task_timeouts["heartbeatTimeout"] = "NONE"

    with freeze_time("2015-01-01 12:00:00"):
        task = ActivityTask(
            activity_id="my-activity-123",
            activity_type="foo",
            workflow_input="optional",
            scheduled_event_id=117,
            timeouts=activity_task_timeouts,
            workflow_execution=wfe,
        )
        assert task.first_timeout() is None


def test_activity_task_cannot_timeout_on_closed_workflow_execution():
    with freeze_time("2015-01-01 12:00:00"):
        wfe = make_workflow_execution()
        wfe.start()

    with freeze_time("2015-01-01 13:58:00"):
        task = ActivityTask(
            activity_id="my-activity-123",
            activity_type="foo",
            workflow_input="optional",
            scheduled_event_id=117,
            timeouts=ACTIVITY_TASK_TIMEOUTS,
            workflow_execution=wfe,
        )

    with freeze_time("2015-01-01 14:10:00"):
        assert isinstance(task.first_timeout(), Timeout)
        assert isinstance(wfe.first_timeout(), Timeout)
        process_first_timeout(wfe)
        assert task.first_timeout() is None


def test_activity_task_cannot_change_state_on_closed_workflow_execution():
    wfe = make_workflow_execution()
    wfe.start()

    task = ActivityTask(
        activity_id="my-activity-123",
        activity_type="foo",
        workflow_input="optional",
        scheduled_event_id=117,
        timeouts=ACTIVITY_TASK_TIMEOUTS,
        workflow_execution=wfe,
    )
    wfe.complete(123)

    with pytest.raises(SWFWorkflowExecutionClosedError):
        task.timeout(Timeout(task, 0, "foo"))
    with pytest.raises(SWFWorkflowExecutionClosedError):
        task.complete()
    with pytest.raises(SWFWorkflowExecutionClosedError):
        task.fail()