1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
# frozen_string_literal: true
require "sidekiq/processor"
require "set"
module Sidekiq
##
# The Manager is the central coordination point in Sidekiq, controlling
# the lifecycle of the Processors.
#
# Tasks:
#
# 1. start: Spin up Processors.
# 3. processor_died: Handle job failure, throw away Processor, create new one.
# 4. quiet: shutdown idle Processors.
# 5. stop: hard stop the Processors by deadline.
#
# Note that only the last task requires its own Thread since it has to monitor
# the shutdown process. The other tasks are performed by other threads.
#
class Manager
include Sidekiq::Component
attr_reader :workers
attr_reader :capsule
def initialize(capsule)
@config = @capsule = capsule
@count = capsule.concurrency
raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
@done = false
@workers = Set.new
@plock = Mutex.new
@count.times do
@workers << Processor.new(@config, &method(:processor_result))
end
end
def start
@workers.each(&:start)
end
def quiet
return if @done
@done = true
logger.info { "Terminating quiet threads for #{capsule.name} capsule" }
@workers.each(&:terminate)
end
def stop(deadline)
quiet
# some of the shutdown events can be async,
# we don't have any way to know when they're done but
# give them a little time to take effect
sleep PAUSE_TIME
return if @workers.empty?
logger.info { "Pausing to allow jobs to finish..." }
wait_for(deadline) { @workers.empty? }
return if @workers.empty?
hard_shutdown
ensure
capsule.stop
end
def processor_result(processor, reason = nil)
@plock.synchronize do
@workers.delete(processor)
unless @done
p = Processor.new(@config, &method(:processor_result))
@workers << p
p.start
end
end
end
def stopped?
@done
end
private
def hard_shutdown
# We've reached the timeout and we still have busy threads.
# They must die but their jobs shall live on.
cleanup = nil
@plock.synchronize do
cleanup = @workers.dup
end
if cleanup.size > 0
jobs = cleanup.map { |p| p.job }.compact
logger.warn { "Terminating #{cleanup.size} busy threads" }
logger.debug { "Jobs still in progress #{jobs.inspect}" }
# Re-enqueue unfinished jobs
# NOTE: You may notice that we may push a job back to redis before
# the thread is terminated. This is ok because Sidekiq's
# contract says that jobs are run AT LEAST once. Process termination
# is delayed until we're certain the jobs are back in Redis because
# it is worse to lose a job than to run it twice.
capsule.fetcher.bulk_requeue(jobs)
end
cleanup.each do |processor|
processor.kill
end
# when this method returns, we immediately call `exit` which may not give
# the remaining threads time to run `ensure` blocks, etc. We pause here up
# to 3 seconds to give threads a minimal amount of time to run `ensure` blocks.
deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + 3
wait_for(deadline) { @workers.empty? }
end
# hack for quicker development / testing environment #2774
PAUSE_TIME = $stdout.tty? ? 0.1 : 0.5
# Wait for the orblock to be true or the deadline passed.
def wait_for(deadline, &condblock)
remaining = deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
while remaining > PAUSE_TIME
return if condblock.call
sleep PAUSE_TIME
remaining = deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
end
end
end
end
|