File: activity_task.rb

package info (click to toggle)
ruby-aws-sdk 1.67.0-3
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 6,840 kB
  • sloc: ruby: 28,436; makefile: 7
file content (178 lines) | stat: -rw-r--r-- 5,645 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# Copyright 2011-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

module AWS
  class SimpleWorkflow
    class ActivityTask

      # Raised by {ActivityTask#record_heartbeat!} when this activity
      # task has received a cancellation request.
      class CancelRequestedError < StandardError; end

      include Core::Model

      # @api private
      def initialize domain, data, options = {}

        @domain = domain

        @task_token = data['taskToken']
        @activity_id = data['activityId']
        @started_event_id = data['startedEventId']
        @input = data['input']

        name = data['activityType']['name']
        version = data['activityType']['version']
        @activity_type = domain.activity_types[name,version]

        workflow_id = data['workflowExecution']['workflowId']
        run_id = data['workflowExecution']['runId']
        @workflow_execution = domain.workflow_executions[workflow_id,run_id]

        super

      end

      # @return [String] The opaque string used as a handle on the task.
      attr_reader :task_token

      # @return [String] The unique identifier of this task.
      attr_reader :activity_id

      # @return [Domain] The domain this task was scheduled in.
      attr_reader :domain

      # @return [Integer]
      #   The id of the {ActivityTaskStarted} event recorded in the history.
      attr_reader :started_event_id

      # @return [String,nil] The input provided when the activity task was
      #   scheduled.
      attr_reader :input

      # @return [ActivityType]
      attr_reader :activity_type

      # @return [WorkflowExecution]
      attr_reader :workflow_execution

      # Reports to the service that the activity task is progressing.
      #
      # You can optionally specify `:details` that describe the progress.
      # This might be a percentage competition, step number, etc.
      #
      #     activity_task.record_heartbeat! :details => '.75' # 75% complete
      #
      # If the activity task has been canceled since it was received or
      # since the last recorded heartbeat, this method will raise
      # a CancelRequestedError.
      #
      # If you are processing the activity task inside a block passed
      # to one of the polling methods in {ActivityTaskCollection}
      # then untrapped CancelRequestedErrors are caught
      # and responded to automatically.
      #
      #     domain.activity_tasks.poll('task-list') do |task|
      #       task.record_heartbeat! # raises CancelRequestedError
      #     end # traps the error and responds activity task canceled.
      #
      # If you need to cleanup or provide additional details in the
      # cancellation response, you can trap the error and
      # respond manually.
      #
      #     domain.activity_tasks.poll('task-list') do |task|
      #       task.record_heartbeat! # raises CancelRequestedError
      #     rescue CancelRequestedError => e
      #        # cleanup
      #        task.respond_canceled! :details => '...'
      #     end
      #
      # @param [Hash] options
      #
      # @option options [String] :details (nil)
      #   If specified, contains details about the progress of the task.
      #
      def record_heartbeat! options = {}

        client_opts = {}
        client_opts[:task_token] = task_token
        client_opts[:details] = options[:details] if options[:details]

        response = client.record_activity_task_heartbeat(client_opts)

        raise CancelRequestedError if response.data['cancelRequested']

        nil

      end

      # @param [Hash] options
      #
      # @option options [String] :result (nil)
      #
      # @return [nil]
      #
      def complete! options = {}
        respond :completed, options
      end

      # @param [Hash] options
      #
      # @option options [String] :details (nil)
      #
      # @return [nil]
      #
      def cancel! options = {}
        respond :canceled, options
      end

      # @param [Hash] options
      #
      # @option options [String] :details (nil)
      #
      # @option options [String] :reason (nil)
      #
      # @return [nil]
      #
      def fail! options = {}
        respond :failed, options
      end

      def responded?
        !!@responded
      end

      # Responds to one of the `respond_activity_task_` methods with a set of options. This method is called when any
      # of the {#complete!}, {#cancel!}, or {#fail!} methods is invoked.
      #
      # @note Only one response can be logged per `ActivityTask` instance; If this task has already logged a response,
      #   `respond` will raise an exception.
      #
      # @param [String] status
      #   The status of the response: "canceled", "completed", or "failed".
      #
      # @param [Hash] options
      #   Options to provide to the respond_activity_task function that will be called.
      #
      protected
      def respond status, options
        raise "already responded" if responded?
        @responded = status
        options[:task_token] = task_token
        client.send("respond_activity_task_#{status}", options)
        nil
      end

    end
  end
end