1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
|
"""
Tests for Layer1 of Simple Workflow
"""
import os
import unittest
import time
from boto.swf.layer1 import Layer1
from boto.swf import exceptions as swf_exceptions
# A standard AWS account is permitted a maximum of 100 of SWF domains,
# registered or deprecated. Deleting deprecated domains on demand does
# not appear possible. Therefore, these tests reuse a default or
# user-named testing domain. This is named by the user via the environment
# variable BOTO_SWF_UNITTEST_DOMAIN, if available. Otherwise the default
# testing domain is literally "boto-swf-unittest-domain". Do not use
# the testing domain for other purposes.
BOTO_SWF_UNITTEST_DOMAIN = os.environ.get("BOTO_SWF_UNITTEST_DOMAIN",
"boto-swf-unittest-domain")
# A standard domain can have a maxiumum of 10,000 workflow types and
# activity types, registered or deprecated. Therefore, eventually any
# tests which register new workflow types or activity types would begin
# to fail with LimitExceeded. Instead of generating new workflow types
# and activity types, these tests reuse the existing types.
# The consequence of the limits and inability to delete deprecated
# domains, workflow types, and activity types is that the tests in
# this module will not test for the three register actions:
# * register_domain
# * register_workflow_type
# * register_activity_type
# Instead, the setUp of the TestCase create a domain, workflow type,
# and activity type, expecting that they may already exist, and the
# tests themselves test other things.
# If you really want to re-test the register_* functions in their
# ability to create things (rather than just reporting that they
# already exist), you'll need to use a new BOTO_SWF_UNITTEST_DOMAIN.
# But, beware that once you hit 100 domains, you are cannot create any
# more, delete existing ones, or rename existing ones.
# Some API calls establish resources, but these resources are not instantly
# available to the next API call. For testing purposes, it is necessary to
# have a short pause to avoid having tests fail for invalid reasons.
PAUSE_SECONDS = 4
class SimpleWorkflowLayer1TestBase(unittest.TestCase):
"""
There are at least two test cases which share this setUp/tearDown
and the class-based parameter definitions:
* SimpleWorkflowLayer1Test
* tests.swf.test_layer1_workflow_execution.SwfL1WorkflowExecutionTest
"""
swf = True
# Some params used throughout the tests...
# Domain registration params...
_domain = BOTO_SWF_UNITTEST_DOMAIN
_workflow_execution_retention_period_in_days = 'NONE'
_domain_description = 'test workflow domain'
# Type registration params used for workflow type and activity type...
_task_list = 'tasklist1'
# Workflow type registration params...
_workflow_type_name = 'wft1'
_workflow_type_version = '1'
_workflow_type_description = 'wft1 description'
_default_child_policy = 'REQUEST_CANCEL'
_default_execution_start_to_close_timeout = '600'
_default_task_start_to_close_timeout = '60'
# Activity type registration params...
_activity_type_name = 'at1'
_activity_type_version = '1'
_activity_type_description = 'at1 description'
_default_task_heartbeat_timeout = '30'
_default_task_schedule_to_close_timeout = '90'
_default_task_schedule_to_start_timeout = '10'
_default_task_start_to_close_timeout = '30'
def setUp(self):
# Create a Layer1 connection for testing.
# Tester needs boto config or keys in environment variables.
self.conn = Layer1()
# Register a domain. Expect None (success) or
# SWFDomainAlreadyExistsError.
try:
r = self.conn.register_domain(self._domain,
self._workflow_execution_retention_period_in_days,
description=self._domain_description)
assert r is None
time.sleep(PAUSE_SECONDS)
except swf_exceptions.SWFDomainAlreadyExistsError:
pass
# Register a workflow type. Expect None (success) or
# SWFTypeAlreadyExistsError.
try:
r = self.conn.register_workflow_type(self._domain,
self._workflow_type_name, self._workflow_type_version,
task_list=self._task_list,
default_child_policy=self._default_child_policy,
default_execution_start_to_close_timeout=
self._default_execution_start_to_close_timeout,
default_task_start_to_close_timeout=
self._default_task_start_to_close_timeout,
description=self._workflow_type_description)
assert r is None
time.sleep(PAUSE_SECONDS)
except swf_exceptions.SWFTypeAlreadyExistsError:
pass
# Register an activity type. Expect None (success) or
# SWFTypeAlreadyExistsError.
try:
r = self.conn.register_activity_type(self._domain,
self._activity_type_name, self._activity_type_version,
task_list=self._task_list,
default_task_heartbeat_timeout=
self._default_task_heartbeat_timeout,
default_task_schedule_to_close_timeout=
self._default_task_schedule_to_close_timeout,
default_task_schedule_to_start_timeout=
self._default_task_schedule_to_start_timeout,
default_task_start_to_close_timeout=
self._default_task_start_to_close_timeout,
description=self._activity_type_description)
assert r is None
time.sleep(PAUSE_SECONDS)
except swf_exceptions.SWFTypeAlreadyExistsError:
pass
def tearDown(self):
# Delete what we can...
pass
class SimpleWorkflowLayer1Test(SimpleWorkflowLayer1TestBase):
def test_list_domains(self):
# Find the domain.
r = self.conn.list_domains('REGISTERED')
found = None
for info in r['domainInfos']:
if info['name'] == self._domain:
found = info
break
self.assertNotEqual(found, None, 'list_domains; test domain not found')
# Validate some properties.
self.assertEqual(found['description'], self._domain_description,
'list_domains; description does not match')
self.assertEqual(found['status'], 'REGISTERED',
'list_domains; status does not match')
def test_list_workflow_types(self):
# Find the workflow type.
r = self.conn.list_workflow_types(self._domain, 'REGISTERED')
found = None
for info in r['typeInfos']:
if ( info['workflowType']['name'] == self._workflow_type_name and
info['workflowType']['version'] == self._workflow_type_version ):
found = info
break
self.assertNotEqual(found, None, 'list_workflow_types; test type not found')
# Validate some properties.
self.assertEqual(found['description'], self._workflow_type_description,
'list_workflow_types; description does not match')
self.assertEqual(found['status'], 'REGISTERED',
'list_workflow_types; status does not match')
def test_list_activity_types(self):
# Find the activity type.
r = self.conn.list_activity_types(self._domain, 'REGISTERED')
found = None
for info in r['typeInfos']:
if info['activityType']['name'] == self._activity_type_name:
found = info
break
self.assertNotEqual(found, None, 'list_activity_types; test type not found')
# Validate some properties.
self.assertEqual(found['description'], self._activity_type_description,
'list_activity_types; description does not match')
self.assertEqual(found['status'], 'REGISTERED',
'list_activity_types; status does not match')
def test_list_closed_workflow_executions(self):
# Test various legal ways to call function.
latest_date = time.time()
oldest_date = time.time() - 3600
# With startTimeFilter...
self.conn.list_closed_workflow_executions(self._domain,
start_latest_date=latest_date, start_oldest_date=oldest_date)
# With closeTimeFilter...
self.conn.list_closed_workflow_executions(self._domain,
close_latest_date=latest_date, close_oldest_date=oldest_date)
# With closeStatusFilter...
self.conn.list_closed_workflow_executions(self._domain,
close_latest_date=latest_date, close_oldest_date=oldest_date,
close_status='COMPLETED')
# With tagFilter...
self.conn.list_closed_workflow_executions(self._domain,
close_latest_date=latest_date, close_oldest_date=oldest_date,
tag='ig')
# With executionFilter...
self.conn.list_closed_workflow_executions(self._domain,
close_latest_date=latest_date, close_oldest_date=oldest_date,
workflow_id='ig')
# With typeFilter...
self.conn.list_closed_workflow_executions(self._domain,
close_latest_date=latest_date, close_oldest_date=oldest_date,
workflow_name='ig', workflow_version='ig')
# With reverseOrder...
self.conn.list_closed_workflow_executions(self._domain,
close_latest_date=latest_date, close_oldest_date=oldest_date,
reverse_order=True)
def test_list_open_workflow_executions(self):
# Test various legal ways to call function.
latest_date = time.time()
oldest_date = time.time() - 3600
# With required params only...
self.conn.list_closed_workflow_executions(self._domain,
latest_date, oldest_date)
# With tagFilter...
self.conn.list_closed_workflow_executions(self._domain,
latest_date, oldest_date, tag='ig')
# With executionFilter...
self.conn.list_closed_workflow_executions(self._domain,
latest_date, oldest_date, workflow_id='ig')
# With typeFilter...
self.conn.list_closed_workflow_executions(self._domain,
latest_date, oldest_date,
workflow_name='ig', workflow_version='ig')
# With reverseOrder...
self.conn.list_closed_workflow_executions(self._domain,
latest_date, oldest_date, reverse_order=True)
|