1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
#!/usr/bin/env ruby
# frozen_string_literal: true
require "fileutils"
require "sidekiq/api"
class Sidekiq::Monitor
class Status
VALID_SECTIONS = %w[all version overview processes queues]
COL_PAD = 2
def display(section = nil)
section ||= "all"
unless VALID_SECTIONS.include? section
puts "I don't know how to check the status of '#{section}'!"
puts "Try one of these: #{VALID_SECTIONS.join(", ")}"
return
end
send(section)
end
def all
version
puts
overview
puts
processes
puts
queues
end
def version
puts "Sidekiq #{Sidekiq::VERSION}"
puts Time.now.utc
end
def overview
puts "---- Overview ----"
puts " Processed: #{delimit stats.processed}"
puts " Failed: #{delimit stats.failed}"
puts " Busy: #{delimit stats.workers_size}"
puts " Enqueued: #{delimit stats.enqueued}"
puts " Retries: #{delimit stats.retry_size}"
puts " Scheduled: #{delimit stats.scheduled_size}"
puts " Dead: #{delimit stats.dead_size}"
end
def processes
puts "---- Processes (#{process_set.size}) ----"
process_set.each_with_index do |process, index|
# Keep compatibility with legacy versions since we don't want to break sidekiqmon during rolling upgrades or downgrades.
#
# Before:
# ["default", "critical"]
#
# After:
# {"default" => 1, "critical" => 10}
queues =
if process["weights"]
process["weights"].sort_by { |queue| queue[0] }.map { |capsule| capsule.map { |name, weight| (weight > 0) ? "#{name}: #{weight}" : name }.join(", ") }
else
process["queues"].sort
end
puts "#{process["identity"]} #{tags_for(process)}"
puts " Started: #{Time.at(process["started_at"])} (#{time_ago(process["started_at"])})"
puts " Threads: #{process["concurrency"]} (#{process["busy"]} busy)"
puts " Queues: #{split_multiline(queues, pad: 11)}"
puts " Version: #{process["version"] || "Unknown"}" if process["version"] != Sidekiq::VERSION
puts "" unless (index + 1) == process_set.size
end
end
def queues
puts "---- Queues (#{queue_data.size}) ----"
columns = {
name: [:ljust, (["name"] + queue_data.map(&:name)).map(&:length).max + COL_PAD],
size: [:rjust, (["size"] + queue_data.map(&:size)).map(&:length).max + COL_PAD],
latency: [:rjust, (["latency"] + queue_data.map(&:latency)).map(&:length).max + COL_PAD]
}
columns.each { |col, (dir, width)| print col.to_s.upcase.public_send(dir, width) }
puts
queue_data.each do |q|
columns.each do |col, (dir, width)|
print q.send(col).public_send(dir, width)
end
puts
end
end
private
def delimit(number)
number.to_s.reverse.scan(/.{1,3}/).join(",").reverse
end
def split_multiline(values, opts = {})
return "none" unless values
pad = opts[:pad] || 0
max_length = opts[:max_length] || (80 - pad)
out = []
line = +""
values.each do |value|
if (line.length + value.length) > max_length
out << line
line = " " * pad
end
line << value + ", "
end
out << line[0..-3]
out.join("\n")
end
def tags_for(process)
tags = [
process["tag"],
process["labels"],
((process["quiet"] == "true") ? "quiet" : nil)
].flatten.compact
tags.any? ? "[#{tags.join("] [")}]" : nil
end
def time_ago(timestamp)
seconds = Time.now - Time.at(timestamp)
return "just now" if seconds < 60
return "a minute ago" if seconds < 120
return "#{seconds.floor / 60} minutes ago" if seconds < 3600
return "an hour ago" if seconds < 7200
"#{seconds.floor / 60 / 60} hours ago"
end
QUEUE_STRUCT = Struct.new(:name, :size, :latency)
def queue_data
@queue_data ||= Sidekiq::Queue.all.map { |q|
QUEUE_STRUCT.new(q.name, q.size.to_s, sprintf("%#.2f", q.latency))
}
end
def process_set
@process_set ||= Sidekiq::ProcessSet.new
end
def stats
@stats ||= Sidekiq::Stats.new
end
end
end
|