File: decision_task.rb

package info (click to toggle)
ruby-aws-sdk 1.67.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 6,840 kB
  • sloc: ruby: 28,436; makefile: 7
file content (617 lines) | stat: -rw-r--r-- 22,557 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
# Copyright 2011-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

module AWS
  class SimpleWorkflow

    # ## Getting Decision Tasks
    #
    # You can use #poll or #poll_for_single_task on {DecisionTaskCollection}
    # to grab a decision task:
    #
    #     # get a single task
    #     if task = domain.decision_tasks.poll_for_single_task('task-list')
    #       # make decisions
    #       task.complete!
    #     end
    #
    # See {DecisionTaskCollection} for more information on getting and
    # counting decision tasks.
    #
    # ## Exploring Event History
    #
    # Once you have a decision task you can examine the event history.
    # This can give you the information you need to make decisions.
    # The {#events} and {#new_events} methods enumerate through
    # all events or events since the last decision.
    #
    #     decision_task.new_events.each do |event|
    #       # inspect the #event_type and #attributes
    #     end
    #
    # Check out {HistoryEvent} for more information on working with
    # events.
    #
    # ## Making Decisions
    #
    # Based on the history of events, you should make decisions by calling
    # methods listed below.  You can call each method as many times as
    # you wish, until you have completed the decision task.
    #
    #  * {#schedule_activity_task}
    #  * {#request_cancel_activity_task}
    #
    #  * {#complete_workflow_execution}
    #  * {#cancel_workflow_execution}
    #  * {#fail_workflow_execution}
    #  * {#continue_as_new_workflow_execution}
    #
    #  * {#record_marker}
    #  * {#start_timer}
    #  * {#cancel_timer}
    #
    #  * {#signal_external_workflow_execution}
    #  * {#request_cancel_external_workflow_execution}
    #  * {#start_child_workflow_execution}
    #
    # The decision methods are grouped above by concern.
    #
    # ## Completing the Decision Task
    #
    # Once you have finished adding decisions to the task, you need to
    # complete it.  If you called {DecisionTaskCollection#poll} or
    # {DecisionTaskCollection#poll_for_single_task} with a block
    # argument then the decision will be completed automatically at the
    # end of the block.
    #
    #     domain.decision_tasks.poll do |decision_task|
    #       # ...
    #     end #=> the decision task is closed here
    #
    # If you get a task from {DecisionTaskCollection#poll_for_single_task}
    # without a block, then it is your responsibility to call {#complete!}
    # on the decision task.  If you fail to do this before the
    # task start to close timeout, then a decisionTaskTimedOut event
    # will be added to the workflow execution history.
    #
    class DecisionTask

      include Core::Model
      include OptionFormatters

      # @api private
      def initialize domain, request_options, data

        @domain = domain

        @request_options = request_options

        @task_token = data['taskToken']

        workflow_id = data['workflowExecution']['workflowId']
        run_id = data['workflowExecution']['runId']
        @workflow_execution = WorkflowExecution.new(domain, workflow_id, run_id)

        name = data['workflowType']['name']
        version = data['workflowType']['version']
        @workflow_type = WorkflowType.new(domain, name, version)

        @previous_started_event_id = data['previousStartedEventId']

        @started_event_id = data['startedEventId']

        @next_token = data['nextPageToken']

        @events = data['events']

        @decisions = []

        super

      end

      # @return [String] The decision task identifier.
      attr_reader :task_token

      # @return [Domain]
      attr_reader :domain

      # @return [WorkflowExecution]
      attr_reader :workflow_execution

      # @return [WorkflowType]
      attr_reader :workflow_type

      # @return [Integer] The id of the DecisionTaskStarted event of the
      #   previous decision task of this workflow execution that was
      #   processed by the decider. This can be used to determine the new
      #   events in the history new since the last decision task received
      #   by the decider.
      attr_reader :previous_started_event_id

      # @return [Integer] The id of the DecisionTaskStarted event recorded
      #   in the history.
      attr_reader :started_event_id

      # @return [String,nil] Returns a value if the results are paginated.
      #   Normally you do not need this value, as {#events} will enumerate
      #   all events, making requests as necessary to get more.
      attr_reader :next_token

      # @return [Enumerable] Returns an enumerable collection of all events
      #   for the workflow execution.
      def events
        enum_for(:_events)
      end

      # @return [Enumerable] Returns an enumerable collection of only the
      #   new events for workflow execution (since the last decision).
      def new_events
        enum_for(:_new_events)
      end

      # @param [Hash] options
      #
      # @option options [String] :execution_context (nil)  A user-defined
      #   context to add to the workflow execution.  You can fetch this
      #   later with {WorkflowExecution#latest_execution_context}.
      #
      # @return [nil]
      #
      def complete! options = {}

        raise 'already responded' if responded?

        options[:task_token] = task_token
        options[:decisions] = @decisions unless @decisions.empty?

        client.respond_decision_task_completed(options)

        @responded = true

        nil

      end

      # @return [Boolean] Returns true if {#complete!} has been called on
      #   this decision task.
      def responded?
        !!@responded
      end

      # Schedules an activity task.
      #
      # @note This adds a decision to this task that is finalized when you
      #   call {#complete!}.
      #
      # @param [ActivityType,Hash] activity_type The type of activity
      #   to schedule.  `activity_type` must be an {ActivityType} object
      #   or a hash with `:name` and `:version`.
      #
      # @param [Hash] options
      #
      # @option options [String] :control (nil) Optional data attached to
      #   the event that can be used by the decider in subsequent workflow
      #   tasks. This data is not sent to the activity.
      #
      # @option options [Integer,:none] :heartbeat_timeout (nil)
      #   The maximum time before which a worker processing a task
      #   of this type must report progress.  If the timeout is exceeded,
      #   the activity task is automatically timed out. If the worker
      #   subsequently attempts to record a heartbeat or returns a
      #   result, it will be ignored. This default can be overridden when
      #   scheduling an activity task.
      #
      #   The value should be a number of seconds (integer) or the symbol
      #   `:none` (implying no timeout).
      #
      # @option options [String] :input (nil) Input provided to the
      #   activity task.
      #
      # @option options [Integer,:none] :schedule_to_close_timeout (nil)
      #   The maximum duration that a task of this activity type
      #   can wait before being assigned to a worker.
      #
      #   A schedule-to-close timeout for this activity task must be
      #   specified either as a default for the activity type or through
      #   this option. If neither this field is set nor a default was
      #   specified at registration time then a fault will be returned.
      #
      #   The value should be a number of seconds (integer) or the symbol
      #   `:none` (implying no timeout).
      #
      # @option options [Integer,:none] :schedule_to_start_timeout (nil)
      #   The maximum duration that a task of this activity type
      #   can wait before being assigned to a worker.  This overrides the
      #   default timeout specified when registering the activity type.
      #
      #   The value should be a number of seconds (integer) or the symbol
      #   `:none` (implying no timeout).
      #
      # @option options [Integer,:none] :start_to_close_timeout (nil)
      #   The maximum duration that a worker can take to process
      #   tasks of this activity type.  This overrides the default
      #   timeout specified when registering the activity type.
      #
      #   The value should be a number of seconds (integer) or the symbol
      #   `:none` (implying no timeout).
      #
      # @option options [String] :task_list (nil)
      #   If set, specifies the name of the task list in which to schedule
      #   the activity task. If not specified, the default task list
      #   registered with the activity type will be used.
      #
      # @option options [Integer] :task_priority (nil)
      #   If set, specifies the value of the task priority at which to
      #   schedule the activity task. If not specified, the default task
      #   priority registered with the activity type will be used.
      #
      # @return [nil]
      #
      def schedule_activity_task activity_type, options = {}

        options[:activity_id] ||= SecureRandom.uuid

        options[:activity_type] = case activity_type
        when Hash
          unless
            activity_type[:name].is_a?(String) and
            activity_type[:version].is_a?(String) and
            activity_type.keys.length == 2
          then
            msg = 'activity_type hash must have :name and :version strings'
            raise ArgumentError, msg
          end
          activity_type
        when ActivityType
          { :name => activity_type.name, :version => activity_type.version }
        else
          msg = 'expected activity_type to be an ActivityType object or a hash'
          raise ArgumentError, msg
        end

        duration_opts(options,
          :heartbeat_timeout,
          :schedule_to_close_timeout,
          :schedule_to_start_timeout,
          :start_to_close_timeout)

        if priority = options[:task_priority]
          options[:task_priority] = priority.to_s
        end

        if task_list = options[:task_list]
          options[:task_list] = { :name => task_list }
        end

        add_decision :schedule_activity_task, options

      end

      # Attempts to cancel a previously scheduled activity task. If the
      # activity task was scheduled but has not been assigned to a worker,
      # then it will be canceled. If the activity task was already assigned
      # to a worker, then the worker will be informed that cancellation has
      # been requested when recording the activity task heartbeat.
      #
      # @param [ActivityTask,String] activity_or_activity_id An {ActivityTask}
      #   object or the activity_id of an activity task to request
      #   cancellation for.
      #
      # @return [nil]
      #
      def request_cancel_activity_task activity_or_activity_id

        id = activity_or_activity_id.is_a?(ActivityTask) ?
          activity_or_activity_id.activity_id :
          activity_or_activity_id

        add_decision :request_cancel_activity_task, { :activity_id => id }

      end

      # Closes the workflow execution and records a
      # WorkflowExecutionCompleted event in the history.
      #
      # @param [Hash] options
      #
      # @option options [String] :result (nil) The results of the workflow
      #   execution.
      #
      # @return [nil]
      #
      def complete_workflow_execution options = {}
        add_decision :complete_workflow_execution, options
      end

      # Closes the workflow execution and records a WorkflowExecutionFailed
      # event in the history.
      #
      # @param [Hash] options
      #
      # @option options [String] :reason (nil) Reason for the failure.
      #
      # @option options [String] :details (nil) Optional details of the failure.
      #
      # @return [nil]
      #
      def fail_workflow_execution options = {}
        add_decision :fail_workflow_execution, options
      end

      # Closes the workflow execution and records a
      # WorkflowExecutionCanceled event in the history.
      #
      # @param [Hash] options
      #
      # @option options [String] :details (nil) Optional details of the
      #   cancellation.
      #
      # @return [nil]
      #
      def cancel_workflow_execution options = {}
        add_decision :cancel_workflow_execution, options
      end

      # Closes the workflow execution and starts a new workflow execution
      # of the same type using the same workflow id and a unique run Id.
      # A WorkflowExecutionContinuedAsNew event is recorded in the history.
      #
      # @option options [String] :input (nil)
      #   The input for the workflow execution. This is a free form string
      #   which should be meaningful to the workflow you are starting.
      #   This input is made available to the new workflow execution in the
      #   WorkflowExecutionStarted history event.
      #
      # @option options [Array<string>] :tag_list ([])
      #   A list of tags (strings) to associate with the workflow execution.
      #   You can specify a maximum of 5 tags.
      #
      # @option options [Symbol] :child_policy (nil)
      #   Specifies the policy to use for the child workflow executions of
      #   this workflow execution if it is terminated explicitly or due to
      #   an expired timeout. This policy overrides the default child policy
      #   specified when registering the workflow type.  The supported child
      #   policies are:
      #
      #   * `:terminate` - the child executions will be terminated.
      #
      #   * `:request_cancel` - a request to cancel will be attempted for each
      #     child execution by recording a WorkflowExecutionCancelRequested
      #     event in its history. It is up to the decider to take appropriate
      #     actions when it receives an execution history with this event.
      #
      #   * `:abandon` - no action will be taken. The child executions will
      #     continue to run.
      #
      # @option options [Integer,:none] :execution_start_to_close_timeout (nil)
      #   The total duration for this workflow execution.  This overrides
      #   the default specified when registering the workflow type.
      #
      #   The value should be a number of seconds (integer) or the symbol
      #   `:none` (implying no timeout).
      #
      # @option options [String] :task_list (nil)
      #   The task list to use for the decision tasks generated for this
      #   workflow execution. This overrides the default task list specified
      #   when registering the workflow type.
      #
      # @option options [Integer] :task_priority (nil)
      #   The task priority to use for the decision tasks generated for this
      #   workflow execution. This overrides the default task priority specified
      #   when registering the workflow type.
      #
      # @option options [Integer,:none] :task_start_to_close_timeout (nil)
      #   Specifies the maximum duration of decision tasks for this
      #   workflow execution. This parameter overrides the default
      #   specified when the workflow type was registered.
      #
      #   The value should be a number of seconds (integer) or the symbol
      #   `:none` (implying no timeout).
      #
      # @return [nil]
      #
      def continue_as_new_workflow_execution options = {}
        start_execution_opts(options)
        add_decision :continue_as_new_workflow_execution, options
      end

      # Records a MarkerRecorded event in the history. Markers can be used
      # for adding custom information in the history for instance to let
      # deciders know that they do not need to look at the history beyond
      # the marker event.
      #
      # @param [String] marker_name The name of the marker.
      #
      # @param [Hash] options
      #
      # @option options [String] :details (nil) Optional details of the marker.
      #
      # @return [nil]
      #
      def record_marker marker_name, options = {}
        add_decision :record_marker, options.merge(:marker_name => marker_name)
      end

      # Schedules a timer for this workflow execution and records a
      # TimerScheduled event in the history. This timer will fire after
      # the specified delay and record a TimerFired event.
      #
      # @param [Integer] start_to_fire_timeout (nil) The duration to wait
      #   before firing the timer.
      #
      # @param [Hash] options
      #
      # @option options [String] :control (nil) Optional data attached to
      #   the event that can be used by the decider in subsequent workflow
      #   tasks.
      #
      # @option options [String] :timer_id (nil) Unique id for the timer.
      #   If you do not pass this option, a UUID will be generated.
      #
      # @return [String] Returns the id of the timer.
      #
      def start_timer start_to_fire_timeout, options = {}
        options[:timer_id] ||= SecureRandom.uuid
        options[:start_to_fire_timeout] = start_to_fire_timeout
        duration_opts(options, :start_to_fire_timeout)
        add_decision :start_timer, options
        options[:timer_id]
      end

      # Cancels a previously scheduled timer and records a TimerCanceled
      # event in the history.
      #
      # @param [String] timer_id The id of the timer to cancel.
      #
      # @return [nil]
      #
      def cancel_timer timer_id
        add_decision :cancel_timer, { :timer_id => timer_id }
      end

      # Requests a signal to be delivered to the specified external workflow
      # execution and records a SignalExternalWorkflowExecutionRequested
      # event in the history.
      #
      # @param [WorkflowExecution,String] workflow_execution
      #
      # @param [String] signal_name
      #
      # @param [Hash] options
      #
      # @option options [String] :control (nil) Optional data attached to
      #   the event that can be used by the decider in subsequent decision
      #   tasks.
      #
      # @option options [String] :input (nil) Optional input to be provided
      #   with the signal.  The target workflow execution will use the
      #   signal name and input to process the signal.
      #
      # @return [nil]
      #
      def signal_external_workflow_execution workflow_execution, signal_name, options = {}
        workflow_execution_opts(options, workflow_execution)
        options[:signal_name] = signal_name
        add_decision :signal_external_workflow_execution, options
      end

      # Requests that a request be made to cancel the specified external
      # workflow execution and records a
      # RequestCancelExternalWorkflowExecutionRequested event in
      # the history.
      #
      # @return [nil]
      #
      def request_cancel_external_workflow_execution workflow_execution, options = {}
        workflow_execution_opts(options, workflow_execution)
        add_decision :request_cancel_external_workflow_execution, options
      end

      # Requests that a child workflow execution be started and records a
      # StartChildWorkflowExecutionRequested event in the history.
      # The child workflow execution is a separate workflow execution with
      # its own history.
      #
      # @param [WorkflowType,Hash] workflow_type (nil) The type of
      #   workflow execution to start.  This should be a {WorkflowType} object
      #   or a hash with the keys `:name` and `:version`.
      #
      # @param [Hash] options
      #
      # @option (see WorkflowType#start_execution)
      #
      # @option options [String] :control (nil) Optional data attached to
      #   the event that can be used by the decider in subsequent workflow
      #   tasks.
      #
      # @return [String] Returns the workflow id of the new execution.
      #
      def start_child_workflow_execution workflow_type, options = {}
        start_execution_opts(options, workflow_type)
        add_decision :start_child_workflow_execution, options
        options[:workflow_id]
      end

      protected
      def workflow_execution_opts options, workflow_execution

       if workflow_execution.is_a?(WorkflowExecution)
          options[:workflow_id] = workflow_execution.workflow_id
          options[:run_id] = workflow_execution.run_id
        elsif
          workflow_execution.is_a?(Hash) and
          workflow_execution[:workflow_id].is_a?(String) and
          workflow_execution[:run_id].is_a?(String) and
          workflow_execution.keys.length == 2
        then
          options.merge!(workflow_execution)
        elsif workflow_execution.is_a?(String)
          options[:workflow_id] = workflow_execution
        else
          msg = 'expected workflow_execution to be a WorkflowExecution ' +
            'object or workflow id or a hash with :workflow_id and :run_id'
          raise ArgumentError, msg
        end

      end

      protected
      def add_decision decision_type, attributes
        @decisions << {
          :decision_type => Core::Inflection.class_name(decision_type.to_s),
          :"#{decision_type}_decision_attributes" => attributes,
        }
        nil
      end

      protected
      def _new_events &block
        _events do |event|
          yield(event) if event.new?
        end
      end

      protected
      def _events &block

        _each_event(@events, &block)
        next_token = self.next_token

        while next_token

          options = @request_options.merge(:next_page_token => next_token)
          response = client.poll_for_decision_task(options)

          _each_event(response.data['events'], &block)
          next_token = response.data['nextPageToken']

        end

      end

      def _each_event events, &block
        prev_event_id = self.previous_started_event_id
        events.each do |description|
          event = HistoryEvent.new(workflow_execution, description)
          Core::MetaUtils.extend_method(event, :new?) do
            prev_event_id.nil? or self.event_id > prev_event_id
          end
          yield(event)
        end
      end


    end
  end
end