1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
|
# frozen_string_literal: true
require_relative "iterable/enumerators"
module Sidekiq
module Job
class Interrupted < ::RuntimeError; end
module Iterable
include Enumerators
# @api private
def self.included(base)
base.extend(ClassMethods)
end
# @api private
module ClassMethods
def method_added(method_name)
raise "#{self} is an iterable job and must not define #perform" if method_name == :perform
super
end
end
# @api private
def initialize
super
@_executions = 0
@_cursor = nil
@_start_time = nil
@_runtime = 0
end
# A hook to override that will be called when the job starts iterating.
#
# It is called only once, for the first time.
#
def on_start
end
# A hook to override that will be called around each iteration.
#
# Can be useful for some metrics collection, performance tracking etc.
#
def around_iteration
yield
end
# A hook to override that will be called when the job resumes iterating.
#
def on_resume
end
# A hook to override that will be called each time the job is interrupted.
#
# This can be due to interruption or sidekiq stopping.
#
def on_stop
end
# A hook to override that will be called when the job finished iterating.
#
def on_complete
end
# The enumerator to be iterated over.
#
# @return [Enumerator]
#
# @raise [NotImplementedError] with a message advising subclasses to
# implement an override for this method.
#
def build_enumerator(*)
raise NotImplementedError, "#{self.class.name} must implement a '#build_enumerator' method"
end
# The action to be performed on each item from the enumerator.
#
# @return [void]
#
# @raise [NotImplementedError] with a message advising subclasses to
# implement an override for this method.
#
def each_iteration(*)
raise NotImplementedError, "#{self.class.name} must implement an '#each_iteration' method"
end
def iteration_key
"it-#{jid}"
end
# @api private
def perform(*arguments)
fetch_previous_iteration_state
@_executions += 1
@_start_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
enumerator = build_enumerator(*arguments, cursor: @_cursor)
unless enumerator
logger.info("'#build_enumerator' returned nil, skipping the job.")
return
end
assert_enumerator!(enumerator)
if @_executions == 1
on_start
else
on_resume
end
completed = catch(:abort) do
iterate_with_enumerator(enumerator, arguments)
end
on_stop
completed = handle_completed(completed)
if completed
on_complete
cleanup
else
reenqueue_iteration_job
end
end
private
def fetch_previous_iteration_state
state = Sidekiq.redis { |conn| conn.hgetall(iteration_key) }
unless state.empty?
@_executions = state["ex"].to_i
@_cursor = Sidekiq.load_json(state["c"])
@_runtime = state["rt"].to_f
end
end
STATE_FLUSH_INTERVAL = 5 # seconds
# we need to keep the state around as long as the job
# might be retrying
STATE_TTL = 30 * 24 * 60 * 60 # one month
def iterate_with_enumerator(enumerator, arguments)
found_record = false
state_flushed_at = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
enumerator.each do |object, cursor|
found_record = true
@_cursor = cursor
is_interrupted = interrupted?
if ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - state_flushed_at >= STATE_FLUSH_INTERVAL || is_interrupted
flush_state
state_flushed_at = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
end
return false if is_interrupted
around_iteration do
each_iteration(object, *arguments)
end
end
logger.debug("Enumerator found nothing to iterate!") unless found_record
true
ensure
@_runtime += (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - @_start_time)
end
def reenqueue_iteration_job
flush_state
logger.debug { "Interrupting job (cursor=#{@_cursor.inspect})" }
raise Interrupted
end
def assert_enumerator!(enum)
unless enum.is_a?(Enumerator)
raise ArgumentError, <<~MSG
#build_enumerator must return an Enumerator, but returned #{enum.class}.
Example:
def build_enumerator(params, cursor:)
active_record_records_enumerator(
Shop.find(params["shop_id"]).products,
cursor: cursor
)
end
MSG
end
end
def flush_state
key = iteration_key
state = {
"ex" => @_executions,
"c" => Sidekiq.dump_json(@_cursor),
"rt" => @_runtime
}
Sidekiq.redis do |conn|
conn.multi do |pipe|
pipe.hset(key, state)
pipe.expire(key, STATE_TTL)
end
end
end
def cleanup
logger.debug {
format("Completed iteration. executions=%d runtime=%.3f", @_executions, @_runtime)
}
Sidekiq.redis { |conn| conn.unlink(iteration_key) }
end
def handle_completed(completed)
case completed
when nil, # someone aborted the job but wants to call the on_complete callback
true
true
when false
false
else
raise "Unexpected thrown value: #{completed.inspect}"
end
end
end
end
end
|