File: monitor.rb

package info (click to toggle)
ruby-sidekiq 7.3.2%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 956 kB
  • sloc: ruby: 6,094; javascript: 526; makefile: 21; sh: 20
file content (147 lines) | stat: -rw-r--r-- 4,356 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env ruby
# frozen_string_literal: true

require "fileutils"
require "sidekiq/api"

class Sidekiq::Monitor
  class Status
    VALID_SECTIONS = %w[all version overview processes queues]
    COL_PAD = 2

    def display(section = nil)
      section ||= "all"
      unless VALID_SECTIONS.include? section
        puts "I don't know how to check the status of '#{section}'!"
        puts "Try one of these: #{VALID_SECTIONS.join(", ")}"
        return
      end
      send(section)
    end

    def all
      version
      puts
      overview
      puts
      processes
      puts
      queues
    end

    def version
      puts "Sidekiq #{Sidekiq::VERSION}"
      puts Time.now.utc
    end

    def overview
      puts "---- Overview ----"
      puts "  Processed: #{delimit stats.processed}"
      puts "     Failed: #{delimit stats.failed}"
      puts "       Busy: #{delimit stats.workers_size}"
      puts "   Enqueued: #{delimit stats.enqueued}"
      puts "    Retries: #{delimit stats.retry_size}"
      puts "  Scheduled: #{delimit stats.scheduled_size}"
      puts "       Dead: #{delimit stats.dead_size}"
    end

    def processes
      puts "---- Processes (#{process_set.size}) ----"
      process_set.each_with_index do |process, index|
        # Keep compatibility with legacy versions since we don't want to break sidekiqmon during rolling upgrades or downgrades.
        #
        # Before:
        #   ["default", "critical"]
        #
        # After:
        #   {"default" => 1, "critical" => 10}
        queues =
          if process["weights"]
            process["weights"].sort_by { |queue| queue[0] }.map { |capsule| capsule.map { |name, weight| (weight > 0) ? "#{name}: #{weight}" : name }.join(", ") }
          else
            process["queues"].sort
          end

        puts "#{process["identity"]} #{tags_for(process)}"
        puts "  Started: #{Time.at(process["started_at"])} (#{time_ago(process["started_at"])})"
        puts "  Threads: #{process["concurrency"]} (#{process["busy"]} busy)"
        puts "   Queues: #{split_multiline(queues, pad: 11)}"
        puts "  Version: #{process["version"] || "Unknown"}" if process["version"] != Sidekiq::VERSION
        puts "" unless (index + 1) == process_set.size
      end
    end

    def queues
      puts "---- Queues (#{queue_data.size}) ----"
      columns = {
        name: [:ljust, (["name"] + queue_data.map(&:name)).map(&:length).max + COL_PAD],
        size: [:rjust, (["size"] + queue_data.map(&:size)).map(&:length).max + COL_PAD],
        latency: [:rjust, (["latency"] + queue_data.map(&:latency)).map(&:length).max + COL_PAD]
      }
      columns.each { |col, (dir, width)| print col.to_s.upcase.public_send(dir, width) }
      puts
      queue_data.each do |q|
        columns.each do |col, (dir, width)|
          print q.send(col).public_send(dir, width)
        end
        puts
      end
    end

    private

    def delimit(number)
      number.to_s.reverse.scan(/.{1,3}/).join(",").reverse
    end

    def split_multiline(values, opts = {})
      return "none" unless values
      pad = opts[:pad] || 0
      max_length = opts[:max_length] || (80 - pad)
      out = []
      line = +""
      values.each do |value|
        if (line.length + value.length) > max_length
          out << line
          line = " " * pad
        end
        line << value + ", "
      end
      out << line[0..-3]
      out.join("\n")
    end

    def tags_for(process)
      tags = [
        process["tag"],
        process["labels"],
        ((process["quiet"] == "true") ? "quiet" : nil)
      ].flatten.compact
      tags.any? ? "[#{tags.join("] [")}]" : nil
    end

    def time_ago(timestamp)
      seconds = Time.now - Time.at(timestamp)
      return "just now" if seconds < 60
      return "a minute ago" if seconds < 120
      return "#{seconds.floor / 60} minutes ago" if seconds < 3600
      return "an hour ago" if seconds < 7200
      "#{seconds.floor / 60 / 60} hours ago"
    end

    QUEUE_STRUCT = Struct.new(:name, :size, :latency)
    def queue_data
      @queue_data ||= Sidekiq::Queue.all.map { |q|
        QUEUE_STRUCT.new(q.name, q.size.to_s, sprintf("%#.2f", q.latency))
      }
    end

    def process_set
      @process_set ||= Sidekiq::ProcessSet.new
    end

    def stats
      @stats ||= Sidekiq::Stats.new
    end
  end
end