File: multi_queue_bench

package info (click to toggle)
ruby-sidekiq 7.3.2%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 956 kB
  • sloc: ruby: 6,094; javascript: 526; makefile: 21; sh: 20
file content (271 lines) | stat: -rwxr-xr-x 6,752 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
#!/usr/bin/env ruby

#
# bin/bench is a helpful script to load test and
# performance tune Sidekiq's core. It's a configurable script,
# which accepts the following parameters as ENV variables.
#
# QUEUES
# Number of queues to consume from. Default is 8
#
# PROCESSES
# The number of processes this benchmark will create. Each process, consumes
# from one of the available queues. When processes are more than the number of
# queues, they are distributed to processes in round robin. Default is 8
#
# ELEMENTS
# Number of jobs to push to each queue. Default is 1000
#
# ITERATIONS
# Each queue pushes ITERATIONS times ELEMENTS jobs. Default is 1000
#
# PORT
# The port of the Dragonfly instance. Default is 6379
#
# IP
# The ip of the Dragonfly instance. Default is 127.0.0.1
#
# Example Usage:
#
# > RUBY_YJIT_ENABLE=1 THREADS=10 PROCESSES=8 QUEUES=8 bin/multi_queue_bench
#
# None of this script is considered a public API and may change over time.
#

# Quiet some warnings we see when running in warning mode:
# RUBYOPT=-w bundle exec sidekiq
$TESTING = false
puts RUBY_DESCRIPTION

require "bundler/setup"
Bundler.require(:default, :load_test)

class LoadWorker
  include Sidekiq::Job
  sidekiq_options retry: 1
  sidekiq_retry_in do |x|
    1
  end

  def perform(idx, ts = nil)
    puts(Time.now.to_f - ts) if !ts.nil?
    # raise idx.to_s if idx % 100 == 1
  end
end

def Process.rss
  `ps -o rss= -p #{Process.pid}`.chomp.to_i
end

$iterations = ENV["ITERATIONS"] ? Integer(ENV["ITERATIONS"]) : 1_000
$elements = ENV["ELEMENTS"] ? Integer(ENV["ELEMENTS"]) : 1_000
$port = ENV["PORT"] ? Integer(ENV["PORT"]) : 6379
$ip = ENV["IP"] ? String(ENV["IP"]) : "127.0.0.1"

class Loader
  def initialize
    @iter =  $iterations
    @count = $elements
  end

  def configure(queue)
    @x = Sidekiq.configure_embed do |config|
      config.redis = {db: 0, host: $ip, port: $port}
      config.concurrency = Integer(ENV.fetch("THREADS", "30"))
      config.queues = queue
      config.logger.level = Logger::WARN
      config.average_scheduled_poll_interval = 2
      config.reliable! if defined?(Sidekiq::Pro)
    end

    @self_read, @self_write = IO.pipe
    %w[INT TERM TSTP TTIN].each do |sig|
      trap sig do
        @self_write.puts(sig)
      end
    rescue ArgumentError
      puts "Signal #{sig} not supported"
    end
  end

  def handle_signal(sig)
    launcher = @x
    Sidekiq.logger.debug "Got #{sig} signal"
    case sig
    when "INT"
      # Handle Ctrl-C in JRuby like MRI
      # http://jira.codehaus.org/browse/JRUBY-4637
      raise Interrupt
    when "TERM"
      # Heroku sends TERM and then waits 30 seconds for process to exit.
      raise Interrupt
    when "TSTP"
      Sidekiq.logger.info "Received TSTP, no longer accepting new work"
      launcher.quiet
    when "TTIN"
      Thread.list.each do |thread|
        Sidekiq.logger.warn "Thread TID-#{(thread.object_id ^ ::Process.pid).to_s(36)} #{thread["label"]}"
        if thread.backtrace
          Sidekiq.logger.warn thread.backtrace.join("\n")
        else
          Sidekiq.logger.warn "<no backtrace available>"
        end
      end
    end
  end

  def setup(queue)
    Sidekiq.logger.error("Setup RSS: #{Process.rss}")
    Sidekiq.logger.error("Pushing work to queue: #{queue}")
    start = Time.now
    @iter.times do
      arr = Array.new(@count) { |idx| [idx] }
      # Sidekiq always prepends "queue:" to the queue name,
      # that's why we pass 'q1', 'q2', etc instead of 'queue:q1'
      Sidekiq::Client.push_bulk("class" => LoadWorker, "args" => arr, "queue" => queue)
      $stdout.write "."
    end
    puts "Done"
  end

  def monitor_single(queue)
    q = "queue:#{queue}"
    @monitor_single = Thread.new do
      GC.start
      loop do
        sleep 0.2
        total = Sidekiq.redis do |conn|
          conn.llen q
        end

        if total == 0
          sleep 0.1
          @x.stop
          Process.kill("INT", $$)
          break
        end

      end
    end
  end

  def monitor_all(queues)
    @monitor_all = Thread.new do
      GC.start
      loop do
        sleep 0.2
        qsize = 0
        queues.each do |q|
          tmp = Sidekiq.redis do |conn|
            conn.llen q
          end
          qsize = qsize + tmp
        end
        total = qsize

        if total == 0
          ending = Time.now - @start
          size = @iter * @count * queues.length()
          Sidekiq.logger.error("Done, #{size} jobs in #{ending} sec, #{(size / ending).to_i} jobs/sec")
          Sidekiq.logger.error("Ending RSS: #{Process.rss}")

          sleep 0.1
          @x.stop
          Process.kill("INT", $$)
          break
        end
      end
    end
  end

  def run(queues, queue, monitor_all_queues)
    Sidekiq.logger.warn("Consuming from #{queue}")
    if monitor_all_queues
      monitor_all(queues)
    else
      monitor_single(queue)
    end

    @start = Time.now
    @x.run

    while (readable_io = IO.select([@self_read]))
      signal = readable_io.first[0].gets.strip
      handle_signal(signal)
    end
    # normal
  rescue Interrupt
  rescue => e
    raise e if $DEBUG
    warn e.message
    warn e.backtrace.join("\n")
    exit 1
  ensure
    @x.stop
  end
end

def setup(queue)
  ll = Loader.new
  ll.configure(queue)
  ll.setup(queue)
end

def consume(queues, queue, monitor_all_queues)
  ll = Loader.new
  ll.configure(queue)
  ll.run(queues, queue, monitor_all_queues)
end

# We assign one queue to each sidekiq process
def run(number_of_processes, total_queues)
  read_stream, write_stream = IO.pipe

  queues = []
  (0..total_queues-1).each do |idx|
    queues.push("queue:q#{idx}")
  end

  Sidekiq.logger.info("Queues are: #{queues}")

  # Produce
  start = Time.now
  (0..total_queues-1).each do |idx|
    Process.fork do
      queue_num = "q#{idx}"
      setup(queue_num)
    end
  end

  queue_sz = $iterations * $elements * total_queues
  Process.waitall

  ending = Time.now - start
  #Sidekiq.logger.info("Pushed #{queue_sz} in #{ending} secs")

  # Consume
  (0..number_of_processes-1).each do |idx|
    Process.fork do
      # First process only consumes from it's own queue but monitors all queues.
      # It works as a synchronization point. Once all processes finish
      # (that is, when all queues are emptied) it prints the the stats.
      if idx == 0
        queue = "q#{idx}"
        consume(queues, queue, true)
      else
        queue = "q#{idx % total_queues}"
        consume(queues, queue, false)
      end
    end
  end

  Process.waitall
  write_stream.close
  results = read_stream.read
  read_stream.close
end

$total_processes = ENV["PROCESSES"] ? Integer(ENV["PROCESSES"]) : 8;
$total_queues = ENV["QUEUES"] ? Integer(ENV["QUEUES"]) : 8;

run($total_processes, $total_queues)