File: test_layer1.py

package info (click to toggle)
python-boto 2.34.0-2
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 8,584 kB
  • ctags: 10,521
  • sloc: python: 78,553; makefile: 123
file content (246 lines) | stat: -rw-r--r-- 10,743 bytes parent folder | download | duplicates (14)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""
Tests for Layer1 of Simple Workflow

"""
import os
import unittest
import time

from boto.swf.layer1 import Layer1
from boto.swf import exceptions as swf_exceptions



# A standard AWS account is permitted a maximum of 100 of SWF domains,
# registered or deprecated.  Deleting deprecated domains on demand does
# not appear possible.  Therefore, these tests reuse a default or
# user-named testing domain.  This is named by the user via the environment
# variable BOTO_SWF_UNITTEST_DOMAIN, if available.  Otherwise the default
# testing domain is literally "boto-swf-unittest-domain".  Do not use
# the testing domain for other purposes.
BOTO_SWF_UNITTEST_DOMAIN = os.environ.get("BOTO_SWF_UNITTEST_DOMAIN",
                                          "boto-swf-unittest-domain")

# A standard domain can have a maxiumum of 10,000 workflow types and
# activity types, registered or deprecated.  Therefore, eventually any
# tests which register new workflow types or activity types would begin
# to fail with LimitExceeded.  Instead of generating new workflow types
# and activity types, these tests reuse the existing types.

# The consequence of the limits and inability to delete deprecated
# domains, workflow types, and activity types is that the tests in
# this module will not test for the three register actions:
#    * register_domain
#    * register_workflow_type
#    * register_activity_type
# Instead, the setUp of the TestCase create a domain, workflow type,
# and activity type, expecting that they may already exist, and the
# tests themselves test other things.

# If you really want to re-test the register_* functions in their
# ability to create things (rather than just reporting that they
# already exist), you'll need to use a new BOTO_SWF_UNITTEST_DOMAIN.
# But, beware that once you hit 100 domains, you are cannot create any
# more, delete existing ones, or rename existing ones.

# Some API calls establish resources, but these resources are not instantly
# available to the next API call.  For testing purposes, it is necessary to
# have a short pause to avoid having tests fail for invalid reasons.
PAUSE_SECONDS = 4



class SimpleWorkflowLayer1TestBase(unittest.TestCase):
    """
    There are at least two test cases which share this setUp/tearDown
    and the class-based parameter definitions:
        * SimpleWorkflowLayer1Test
        * tests.swf.test_layer1_workflow_execution.SwfL1WorkflowExecutionTest
    """
    swf = True
    # Some params used throughout the tests...
    # Domain registration params...
    _domain = BOTO_SWF_UNITTEST_DOMAIN
    _workflow_execution_retention_period_in_days = 'NONE'
    _domain_description = 'test workflow domain'
    # Type registration params used for workflow type and activity type...
    _task_list = 'tasklist1'
    # Workflow type registration params...
    _workflow_type_name = 'wft1'
    _workflow_type_version = '1'
    _workflow_type_description = 'wft1 description'
    _default_child_policy = 'REQUEST_CANCEL'
    _default_execution_start_to_close_timeout = '600'
    _default_task_start_to_close_timeout = '60'
    # Activity type registration params...
    _activity_type_name = 'at1'
    _activity_type_version = '1'
    _activity_type_description = 'at1 description'
    _default_task_heartbeat_timeout = '30'
    _default_task_schedule_to_close_timeout = '90'
    _default_task_schedule_to_start_timeout = '10'
    _default_task_start_to_close_timeout = '30'


    def setUp(self):
        # Create a Layer1 connection for testing.
        # Tester needs boto config or keys in environment variables.
        self.conn = Layer1()

        # Register a domain.  Expect None (success) or
        # SWFDomainAlreadyExistsError.
        try:
            r = self.conn.register_domain(self._domain,
                    self._workflow_execution_retention_period_in_days,
                    description=self._domain_description)
            assert r is None
            time.sleep(PAUSE_SECONDS)
        except swf_exceptions.SWFDomainAlreadyExistsError:
            pass

        # Register a workflow type.  Expect None (success) or
        # SWFTypeAlreadyExistsError.
        try:
            r = self.conn.register_workflow_type(self._domain,
                    self._workflow_type_name, self._workflow_type_version,
                    task_list=self._task_list,
                    default_child_policy=self._default_child_policy,
                    default_execution_start_to_close_timeout=
                        self._default_execution_start_to_close_timeout,
                    default_task_start_to_close_timeout=
                        self._default_task_start_to_close_timeout,
                    description=self._workflow_type_description)
            assert r is None
            time.sleep(PAUSE_SECONDS)
        except swf_exceptions.SWFTypeAlreadyExistsError:
            pass

        # Register an activity type.  Expect None (success) or
        # SWFTypeAlreadyExistsError.
        try:
            r = self.conn.register_activity_type(self._domain,
                    self._activity_type_name, self._activity_type_version,
                    task_list=self._task_list,
                    default_task_heartbeat_timeout=
                        self._default_task_heartbeat_timeout,
                    default_task_schedule_to_close_timeout=
                        self._default_task_schedule_to_close_timeout,
                    default_task_schedule_to_start_timeout=
                        self._default_task_schedule_to_start_timeout,
                    default_task_start_to_close_timeout=
                        self._default_task_start_to_close_timeout,
                    description=self._activity_type_description)
            assert r is None
            time.sleep(PAUSE_SECONDS)
        except swf_exceptions.SWFTypeAlreadyExistsError:
            pass

    def tearDown(self):
        # Delete what we can...
        pass




class SimpleWorkflowLayer1Test(SimpleWorkflowLayer1TestBase):

    def test_list_domains(self):
        # Find the domain.
        r = self.conn.list_domains('REGISTERED')
        found = None
        for info in r['domainInfos']:
            if info['name'] == self._domain:
                found = info
                break
        self.assertNotEqual(found, None, 'list_domains; test domain not found')
        # Validate some properties.
        self.assertEqual(found['description'], self._domain_description,
                         'list_domains; description does not match')
        self.assertEqual(found['status'], 'REGISTERED',
                         'list_domains; status does not match')

    def test_list_workflow_types(self):
        # Find the workflow type.
        r = self.conn.list_workflow_types(self._domain, 'REGISTERED')
        found = None
        for info in r['typeInfos']:
            if ( info['workflowType']['name'] == self._workflow_type_name and
                 info['workflowType']['version'] == self._workflow_type_version ):
                found = info
                break
        self.assertNotEqual(found, None, 'list_workflow_types; test type not found')
        # Validate some properties.
        self.assertEqual(found['description'], self._workflow_type_description,
                         'list_workflow_types; description does not match')
        self.assertEqual(found['status'], 'REGISTERED',
                         'list_workflow_types; status does not match')

    def test_list_activity_types(self):
        # Find the activity type.
        r = self.conn.list_activity_types(self._domain, 'REGISTERED')
        found = None
        for info in r['typeInfos']:
            if info['activityType']['name'] == self._activity_type_name:
                found = info
                break
        self.assertNotEqual(found, None, 'list_activity_types; test type not found')
        # Validate some properties.
        self.assertEqual(found['description'], self._activity_type_description,
                         'list_activity_types; description does not match')
        self.assertEqual(found['status'], 'REGISTERED',
                         'list_activity_types; status does not match')


    def test_list_closed_workflow_executions(self):
        # Test various legal ways to call function.
        latest_date = time.time()
        oldest_date = time.time() - 3600
        # With startTimeFilter...
        self.conn.list_closed_workflow_executions(self._domain, 
            start_latest_date=latest_date, start_oldest_date=oldest_date)
        # With closeTimeFilter...
        self.conn.list_closed_workflow_executions(self._domain, 
            close_latest_date=latest_date, close_oldest_date=oldest_date)
        # With closeStatusFilter...
        self.conn.list_closed_workflow_executions(self._domain, 
            close_latest_date=latest_date, close_oldest_date=oldest_date,
            close_status='COMPLETED')
        # With tagFilter...
        self.conn.list_closed_workflow_executions(self._domain, 
            close_latest_date=latest_date, close_oldest_date=oldest_date,
            tag='ig')
        # With executionFilter...
        self.conn.list_closed_workflow_executions(self._domain, 
            close_latest_date=latest_date, close_oldest_date=oldest_date,
            workflow_id='ig')
        # With typeFilter...
        self.conn.list_closed_workflow_executions(self._domain, 
            close_latest_date=latest_date, close_oldest_date=oldest_date,
            workflow_name='ig', workflow_version='ig')
        # With reverseOrder...
        self.conn.list_closed_workflow_executions(self._domain, 
            close_latest_date=latest_date, close_oldest_date=oldest_date,
            reverse_order=True)


    def test_list_open_workflow_executions(self):
        # Test various legal ways to call function.
        latest_date = time.time()
        oldest_date = time.time() - 3600
        # With required params only...
        self.conn.list_closed_workflow_executions(self._domain, 
            latest_date, oldest_date)
        # With tagFilter...
        self.conn.list_closed_workflow_executions(self._domain, 
            latest_date, oldest_date, tag='ig')
        # With executionFilter...
        self.conn.list_closed_workflow_executions(self._domain, 
            latest_date, oldest_date, workflow_id='ig')
        # With typeFilter...
        self.conn.list_closed_workflow_executions(self._domain, 
            latest_date, oldest_date,
            workflow_name='ig', workflow_version='ig')
        # With reverseOrder...
        self.conn.list_closed_workflow_executions(self._domain, 
            latest_date, oldest_date, reverse_order=True)