1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
# frozen_string_literal: true
require 'sidekiq/manager'
require 'sidekiq/fetch'
require 'sidekiq/scheduled'
module Sidekiq
# The Launcher is a very simple Actor whose job is to
# start, monitor and stop the core Actors in Sidekiq.
# If any of these actors die, the Sidekiq process exits
# immediately.
class Launcher
include Util
attr_accessor :manager, :poller, :fetcher
STATS_TTL = 5*365*24*60*60
def initialize(options)
@manager = Sidekiq::Manager.new(options)
@poller = Sidekiq::Scheduled::Poller.new
@done = false
@options = options
end
def run
@thread = safe_thread("heartbeat", &method(:start_heartbeat))
@poller.start
@manager.start
end
# Stops this instance from processing any more jobs,
#
def quiet
@done = true
@manager.quiet
@poller.terminate
end
# Shuts down the process. This method does not
# return until all work is complete and cleaned up.
# It can take up to the timeout to complete.
def stop
deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + @options[:timeout]
@done = true
@manager.quiet
@poller.terminate
@manager.stop(deadline)
# Requeue everything in case there was a worker who grabbed work while stopped
# This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
strategy = (@options[:fetch] || Sidekiq::BasicFetch)
strategy.bulk_requeue([], @options)
clear_heartbeat
end
def stopping?
@done
end
private unless $TESTING
def heartbeat
results = Sidekiq::CLI::PROCTITLES.map {|x| x.(self, to_data) }
results.compact!
$0 = results.join(' ')
❤
end
def ❤
key = identity
fails = procd = 0
begin
fails = Processor::FAILURE.reset
procd = Processor::PROCESSED.reset
curstate = Processor::WORKER_STATE.dup
workers_key = "#{key}:workers"
nowdate = Time.now.utc.strftime("%Y-%m-%d")
Sidekiq.redis do |conn|
conn.multi do
conn.incrby("stat:processed", procd)
conn.incrby("stat:processed:#{nowdate}", procd)
conn.expire("stat:processed:#{nowdate}", STATS_TTL)
conn.incrby("stat:failed", fails)
conn.incrby("stat:failed:#{nowdate}", fails)
conn.expire("stat:failed:#{nowdate}", STATS_TTL)
conn.del(workers_key)
curstate.each_pair do |tid, hash|
conn.hset(workers_key, tid, Sidekiq.dump_json(hash))
end
conn.expire(workers_key, 60)
end
end
fails = procd = 0
_, exists, _, _, msg = Sidekiq.redis do |conn|
conn.multi do
conn.sadd('processes', key)
conn.exists(key)
conn.hmset(key, 'info', to_json, 'busy', curstate.size, 'beat', Time.now.to_f, 'quiet', @done)
conn.expire(key, 60)
conn.rpop("#{key}-signals")
end
end
# first heartbeat or recovering from an outage and need to reestablish our heartbeat
fire_event(:heartbeat) if !exists
return unless msg
::Process.kill(msg, $$)
rescue => e
# ignore all redis/network issues
logger.error("heartbeat: #{e.message}")
# don't lose the counts if there was a network issue
Processor::PROCESSED.incr(procd)
Processor::FAILURE.incr(fails)
end
end
def start_heartbeat
while true
heartbeat
sleep 5
end
Sidekiq.logger.info("Heartbeat stopping...")
end
def to_data
@data ||= begin
{
'hostname' => hostname,
'started_at' => Time.now.to_f,
'pid' => $$,
'tag' => @options[:tag] || '',
'concurrency' => @options[:concurrency],
'queues' => @options[:queues].uniq,
'labels' => @options[:labels],
'identity' => identity,
}
end
end
def to_json
@json ||= begin
# this data changes infrequently so dump it to a string
# now so we don't need to dump it every heartbeat.
Sidekiq.dump_json(to_data)
end
end
def clear_heartbeat
# Remove record from Redis since we are shutting down.
# Note we don't stop the heartbeat thread; if the process
# doesn't actually exit, it'll reappear in the Web UI.
Sidekiq.redis do |conn|
conn.pipelined do
conn.srem('processes', identity)
conn.del("#{identity}:workers")
end
end
rescue
# best effort, ignore network errors
end
end
end
|