1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
require 'sidekiq'
require 'sidekiq/util'
require 'sidekiq/actor'
module Sidekiq
##
# The Fetcher blocks on Redis, waiting for a message to process
# from the queues. It gets the message and hands it to the Manager
# to assign to a ready Processor.
class Fetcher
include Util
include Actor
TIMEOUT = 1
attr_reader :down
def initialize(mgr, options)
@down = nil
@mgr = mgr
@strategy = Fetcher.strategy.new(options)
end
# Fetching is straightforward: the Manager makes a fetch
# request for each idle processor when Sidekiq starts and
# then issues a new fetch request every time a Processor
# finishes a message.
#
# Because we have to shut down cleanly, we can't block
# forever and we can't loop forever. Instead we reschedule
# a new fetch if the current fetch turned up nothing.
def fetch
watchdog('Fetcher#fetch died') do
return if Sidekiq::Fetcher.done?
begin
work = @strategy.retrieve_work
::Sidekiq.logger.info("Redis is online, #{Time.now.to_f - @down.to_f} sec downtime") if @down
@down = nil
if work
@mgr.async.assign(work)
else
after(0) { fetch }
end
rescue => ex
handle_fetch_exception(ex)
end
end
end
private
def pause
sleep(TIMEOUT)
end
def handle_fetch_exception(ex)
if !@down
logger.error("Error fetching message: #{ex}")
ex.backtrace.each do |bt|
logger.error(bt)
end
end
@down ||= Time.now
pause
after(0) { fetch }
rescue Task::TerminatedError
# If redis is down when we try to shut down, all the fetch backlog
# raises these errors. Haven't been able to figure out what I'm doing wrong.
end
# Ugh. Say hello to a bloody hack.
# Can't find a clean way to get the fetcher to just stop processing
# its mailbox when shutdown starts.
def self.done!
@done = true
end
def self.reset # testing only
@done = nil
end
def self.done?
@done
end
def self.strategy
Sidekiq.options[:fetch] || BasicFetch
end
end
class BasicFetch
def initialize(options)
@strictly_ordered_queues = !!options[:strict]
@queues = options[:queues].map { |q| "queue:#{q}" }
@unique_queues = @queues.uniq
end
def retrieve_work
work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
UnitOfWork.new(*work) if work
end
# By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it
# an instance method will make it async to the Fetcher actor
def self.bulk_requeue(inprogress, options)
return if inprogress.empty?
Sidekiq.logger.debug { "Re-queueing terminated jobs" }
jobs_to_requeue = {}
inprogress.each do |unit_of_work|
jobs_to_requeue[unit_of_work.queue_name] ||= []
jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.message
end
Sidekiq.redis do |conn|
conn.pipelined do
jobs_to_requeue.each do |queue, jobs|
conn.rpush("queue:#{queue}", jobs)
end
end
end
Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis")
rescue => ex
Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end
UnitOfWork = Struct.new(:queue, :message) do
def acknowledge
# nothing to do
end
def queue_name
queue.gsub(/.*queue:/, '')
end
def requeue
Sidekiq.redis do |conn|
conn.rpush("queue:#{queue_name}", message)
end
end
end
# Creating the Redis#brpop command takes into account any
# configured queue weights. By default Redis#brpop returns
# data from the first queue that has pending elements. We
# recreate the queue command each time we invoke Redis#brpop
# to honor weights and avoid queue starvation.
def queues_cmd
queues = @strictly_ordered_queues ? @unique_queues.dup : @queues.shuffle.uniq
queues << Sidekiq::Fetcher::TIMEOUT
end
end
end
|