File: sidekiqload

package info (click to toggle)
ruby-sidekiq 6.4.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 792 kB
  • sloc: ruby: 4,582; makefile: 20; sh: 6
file content (155 lines) | stat: -rwxr-xr-x 3,967 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env ruby

# Quiet some warnings we see when running in warning mode:
# RUBYOPT=-w bundle exec sidekiq
$TESTING = false

# require "ruby-prof"
require "bundler/setup"
Bundler.require(:default, :load_test)

require_relative "../lib/sidekiq/cli"
require_relative "../lib/sidekiq/launcher"

Sidekiq.configure_server do |config|
  config.options[:concurrency] = 10
  config.redis = {db: 13, port: 6380}
  # config.redis = { db: 13, port: 6380, driver: :hiredis}
  config.options[:queues] << "default"
  config.logger.level = Logger::ERROR
  config.average_scheduled_poll_interval = 2
  config.reliable! if defined?(Sidekiq::Pro)
end

class LoadWorker
  include Sidekiq::Worker
  sidekiq_options retry: 1
  sidekiq_retry_in do |x|
    1
  end

  def perform(idx, ts = nil)
    puts(Time.now.to_f - ts) if !ts.nil?
    # raise idx.to_s if idx % 100 == 1
  end
end

# brew tap shopify/shopify
# brew install toxiproxy
# gem install toxiproxy
# run `toxiproxy-server` in a separate terminal window.
require "toxiproxy"
# simulate a non-localhost network for realer-world conditions.
# adding 1ms of network latency has an ENORMOUS impact on benchmarks
Toxiproxy.populate([{
  name: "redis",
  listen: "127.0.0.1:6380",
  upstream: "127.0.0.1:6379"
}])

self_read, self_write = IO.pipe
%w[INT TERM TSTP TTIN].each do |sig|
  trap sig do
    self_write.puts(sig)
  end
rescue ArgumentError
  puts "Signal #{sig} not supported"
end

Sidekiq.redis { |c| c.flushdb }
def handle_signal(launcher, sig)
  Sidekiq.logger.debug "Got #{sig} signal"
  case sig
  when "INT"
    # Handle Ctrl-C in JRuby like MRI
    # http://jira.codehaus.org/browse/JRUBY-4637
    raise Interrupt
  when "TERM"
    # Heroku sends TERM and then waits 30 seconds for process to exit.
    raise Interrupt
  when "TSTP"
    Sidekiq.logger.info "Received TSTP, no longer accepting new work"
    launcher.quiet
  when "TTIN"
    Thread.list.each do |thread|
      Sidekiq.logger.warn "Thread TID-#{(thread.object_id ^ ::Process.pid).to_s(36)} #{thread["label"]}"
      if thread.backtrace
        Sidekiq.logger.warn thread.backtrace.join("\n")
      else
        Sidekiq.logger.warn "<no backtrace available>"
      end
    end
  end
end

def Process.rss
  `ps -o rss= -p #{Process.pid}`.chomp.to_i
end

iter = 50
count = 10_000

iter.times do
  arr = Array.new(count) do
    []
  end
  count.times do |idx|
    arr[idx][0] = idx
  end
  Sidekiq::Client.push_bulk("class" => LoadWorker, "args" => arr)
end
Sidekiq.logger.error "Created #{count * iter} jobs"

start = Time.now

Monitoring = Thread.new do
  while true
    sleep 0.2
    qsize = Sidekiq.redis do |conn|
      conn.llen "queue:default"
    end
    total = qsize
    # Sidekiq.logger.error("RSS: #{Process.rss} Pending: #{total}")
    if total == 0
      Sidekiq.logger.error("Done, #{iter * count} jobs in #{Time.now - start} sec")
      Sidekiq.logger.error("Now here's the latency for three jobs")

      LoadWorker.perform_async(1, Time.now.to_f)
      LoadWorker.perform_async(2, Time.now.to_f)
      LoadWorker.perform_async(3, Time.now.to_f)

      sleep 0.2
      exit(0)
    end
  end
end

begin
  # RubyProf::exclude_threads = [ Monitoring ]
  # RubyProf.start
  events = Sidekiq.options[:lifecycle_events][:startup]
  events.each(&:call)
  events.clear

  Sidekiq.logger.error "Simulating 1ms of latency between Sidekiq and redis"
  Toxiproxy[:redis].downstream(:latency, latency: 1).apply do
    launcher = Sidekiq::Launcher.new(Sidekiq.options)
    launcher.run

    while readable_io = IO.select([self_read])
      signal = readable_io.first[0].gets.strip
      handle_signal(launcher, signal)
    end
  end
rescue SystemExit => e
  # Sidekiq.logger.error("Profiling...")
  # result = RubyProf.stop
  # printer = RubyProf::GraphHtmlPrinter.new(result)
  # printer.print(File.new("output.html", "w"), :min_percent => 1)
  # normal
rescue => e
  raise e if $DEBUG
  warn e.message
  warn e.backtrace.join("\n")
  exit 1
end