File: concurrent_consumers_stress_spec.rb

package info (click to toggle)
ruby-bunny 2.23.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,644 kB
  • sloc: ruby: 10,256; sh: 70; makefile: 8
file content (71 lines) | stat: -rw-r--r-- 1,778 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# -*- coding: utf-8 -*-
require "spec_helper"

unless ENV["CI"]
  describe "Concurrent consumers sharing a connection" do
    before :all do
      @connection = Bunny.new(username: "bunny_gem", password: "bunny_password", vhost: "bunny_testbed",
                    automatic_recovery: false, continuation_timeout: 45000)
      @connection.start
    end

    after :all do
      @connection.close
    end

    def any_not_drained?(qs)
      qs.any? { |q| !q.message_count.zero? }
    end

    context "when publishing thousands of messages over 128K in size" do
      let(:colors) { ["red", "blue", "white"] }

      let(:n) { 16 }
      let(:m) { 5000 }

      it "successfully drain all queues" do
        ch0  = @connection.create_channel
        ch0.confirm_select
        body = "абвг"
        x    = ch0.topic("bunny.stress.concurrent.consumers.topic", durable: true)

        chs  = {}
        n.times do |i|
          chs[i] = @connection.create_channel
        end
        qs   = []

        n.times do |i|
          t = Thread.new do
            cht = chs[i]

            q = cht.queue("", exclusive: true)
            q.bind(x.name, routing_key: colors.sample).subscribe do |delivery_info, meta, payload|
              # no-op
            end
            qs << q
          end
          t.abort_on_exception = true
        end

        sleep 1.0

        5.times do |i|
          m.times do
            x.publish(body, routing_key: colors.sample)
          end
          puts "Published #{(i + 1) * m} messages..."
          ch0.wait_for_confirms
        end

        while any_not_drained?(qs)
          sleep 1.0
        end
        puts "Drained all queues, winding down..."

        ch0.close
        chs.each { |_, ch| ch.close }
      end
    end
  end
end