File: utils.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (117 lines) | stat: -rw-r--r-- 3,674 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import boto3

from moto.core import DEFAULT_ACCOUNT_ID
from moto.swf.models import ActivityType, Domain, WorkflowExecution, WorkflowType

# Some useful constants
# Here are some activity timeouts we use in moto/swf tests ; they're extracted
# from semi-real world example, the goal is mostly to have predictable and
# intuitive behaviour in moto/swf own tests...
ACTIVITY_TASK_TIMEOUTS = {
    "heartbeatTimeout": "300",  # 5 mins
    "scheduleToStartTimeout": "1800",  # 30 mins
    "startToCloseTimeout": "1800",  # 30 mins
    "scheduleToCloseTimeout": "2700",  # 45 mins
}

# Some useful decisions
SCHEDULE_ACTIVITY_TASK_DECISION = {
    "decisionType": "ScheduleActivityTask",
    "scheduleActivityTaskDecisionAttributes": {
        "activityId": "my-activity-001",
        "activityType": {"name": "test-activity", "version": "v1.1"},
        "taskList": {"name": "activity-task-list"},
    },
}
for key, value in ACTIVITY_TASK_TIMEOUTS.items():
    SCHEDULE_ACTIVITY_TASK_DECISION["scheduleActivityTaskDecisionAttributes"][key] = (
        value
    )


# A test Domain
def get_basic_domain():
    return Domain("test-domain", "90", DEFAULT_ACCOUNT_ID, "us-east-1")


# A test WorkflowType
def _generic_workflow_type_attributes():
    return (
        ["test-workflow", "v1.0"],
        {
            "task_list": "queue",
            "default_child_policy": "ABANDON",
            "default_execution_start_to_close_timeout": "7200",
            "default_task_start_to_close_timeout": "300",
        },
    )


def _generic_workflow_type_attributes_boto3():
    return {
        "name": "test-workflow",
        "version": "v1.0",
        "defaultTaskList": {"name": "queue"},
        "defaultChildPolicy": "ABANDON",
        "defaultExecutionStartToCloseTimeout": "7200",
        "defaultTaskStartToCloseTimeout": "300",
    }


def get_basic_workflow_type():
    args, kwargs = _generic_workflow_type_attributes()
    return WorkflowType(*args, **kwargs)


def mock_basic_workflow_type(domain_name, client):
    kwargs = _generic_workflow_type_attributes_boto3()
    client.register_workflow_type(domain=domain_name, **kwargs)
    return client


# A test WorkflowExecution
def make_workflow_execution(**kwargs):
    domain = get_basic_domain()
    domain.add_type(ActivityType("test-activity", "v1.1"))
    wft = get_basic_workflow_type()
    return WorkflowExecution(domain, wft, "ab1234", **kwargs)


# Makes decision tasks start automatically on a given workflow
def auto_start_decision_tasks(wfe):
    wfe.schedule_decision_task = wfe.schedule_and_start_decision_task
    return wfe


# Setup a complete example workflow and return the connection object
def setup_workflow():
    client = boto3.client("swf", region_name="us-west-1")
    client.register_domain(
        name="test-domain",
        workflowExecutionRetentionPeriodInDays="60",
        description="A test domain",
    )
    mock_basic_workflow_type("test-domain", client)
    client.register_activity_type(
        domain="test-domain",
        name="test-activity",
        version="v1.1",
        defaultTaskHeartbeatTimeout="600",
        defaultTaskScheduleToCloseTimeout="600",
        defaultTaskScheduleToStartTimeout="600",
        defaultTaskStartToCloseTimeout="600",
    )
    wfe = client.start_workflow_execution(
        domain="test-domain",
        workflowId="uid-abcd1234",
        workflowType={"name": "test-workflow", "version": "v1.0"},
    )
    client.run_id = wfe["runId"]
    return client


# A helper for processing the first timeout on a given object
def process_first_timeout(obj):
    _timeout = obj.first_timeout()
    if _timeout:
        obj.timeout(_timeout)