1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
# Copyright 2011-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
module AWS
class SimpleWorkflow
class ActivityTask
# Raised by {ActivityTask#record_heartbeat!} when this activity
# task has received a cancellation request.
class CancelRequestedError < StandardError; end
include Core::Model
# @api private
def initialize domain, data, options = {}
@domain = domain
@task_token = data['taskToken']
@activity_id = data['activityId']
@started_event_id = data['startedEventId']
@input = data['input']
name = data['activityType']['name']
version = data['activityType']['version']
@activity_type = domain.activity_types[name,version]
workflow_id = data['workflowExecution']['workflowId']
run_id = data['workflowExecution']['runId']
@workflow_execution = domain.workflow_executions[workflow_id,run_id]
super
end
# @return [String] The opaque string used as a handle on the task.
attr_reader :task_token
# @return [String] The unique identifier of this task.
attr_reader :activity_id
# @return [Domain] The domain this task was scheduled in.
attr_reader :domain
# @return [Integer]
# The id of the {ActivityTaskStarted} event recorded in the history.
attr_reader :started_event_id
# @return [String,nil] The input provided when the activity task was
# scheduled.
attr_reader :input
# @return [ActivityType]
attr_reader :activity_type
# @return [WorkflowExecution]
attr_reader :workflow_execution
# Reports to the service that the activity task is progressing.
#
# You can optionally specify `:details` that describe the progress.
# This might be a percentage competition, step number, etc.
#
# activity_task.record_heartbeat! :details => '.75' # 75% complete
#
# If the activity task has been canceled since it was received or
# since the last recorded heartbeat, this method will raise
# a CancelRequestedError.
#
# If you are processing the activity task inside a block passed
# to one of the polling methods in {ActivityTaskCollection}
# then untrapped CancelRequestedErrors are caught
# and responded to automatically.
#
# domain.activity_tasks.poll('task-list') do |task|
# task.record_heartbeat! # raises CancelRequestedError
# end # traps the error and responds activity task canceled.
#
# If you need to cleanup or provide additional details in the
# cancellation response, you can trap the error and
# respond manually.
#
# domain.activity_tasks.poll('task-list') do |task|
# task.record_heartbeat! # raises CancelRequestedError
# rescue CancelRequestedError => e
# # cleanup
# task.respond_canceled! :details => '...'
# end
#
# @param [Hash] options
#
# @option options [String] :details (nil)
# If specified, contains details about the progress of the task.
#
def record_heartbeat! options = {}
client_opts = {}
client_opts[:task_token] = task_token
client_opts[:details] = options[:details] if options[:details]
response = client.record_activity_task_heartbeat(client_opts)
raise CancelRequestedError if response.data['cancelRequested']
nil
end
# @param [Hash] options
#
# @option options [String] :result (nil)
#
# @return [nil]
#
def complete! options = {}
respond :completed, options
end
# @param [Hash] options
#
# @option options [String] :details (nil)
#
# @return [nil]
#
def cancel! options = {}
respond :canceled, options
end
# @param [Hash] options
#
# @option options [String] :details (nil)
#
# @option options [String] :reason (nil)
#
# @return [nil]
#
def fail! options = {}
respond :failed, options
end
def responded?
!!@responded
end
# Responds to one of the `respond_activity_task_` methods with a set of options. This method is called when any
# of the {#complete!}, {#cancel!}, or {#fail!} methods is invoked.
#
# @note Only one response can be logged per `ActivityTask` instance; If this task has already logged a response,
# `respond` will raise an exception.
#
# @param [String] status
# The status of the response: "canceled", "completed", or "failed".
#
# @param [Hash] options
# Options to provide to the respond_activity_task function that will be called.
#
protected
def respond status, options
raise "already responded" if responded?
@responded = status
options[:task_token] = task_token
client.send("respond_activity_task_#{status}", options)
nil
end
end
end
end
|