File: fanout_exchange_routing_spec.rb

package info (click to toggle)
ruby-amqp 1.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 2,508 kB
  • sloc: ruby: 8,272; sh: 11; makefile: 10
file content (90 lines) | stat: -rw-r--r-- 2,250 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# encoding: utf-8

require "spec_helper"

describe AMQP::Exchange, "of type fanout" do

  #
  # Environment
  #

  include EventedSpec::AMQPSpec
  include EventedSpec::SpecHelper

  em_before { AMQP.cleanup_state }
  em_after  { AMQP.cleanup_state }

  default_options AMQP_OPTS
  default_timeout 6

  amqp_before do
    @channel   = AMQP::Channel.new
    @channel.should be_open
  end

  after(:all) do
    AMQP.cleanup_state
    done
  end


  #
  # Examples
  #


  context "with three bound queues" do
    it "routes all messages to all bound queues" do
      @exchange = @channel.fanout("amqpgem.integration.multicast.fanout", :auto_delete => true)
      @queue1   = @channel.queue("amqpgem.integration.multicast.queue1",  :auto_delete => true)
      @queue2   = @channel.queue("amqpgem.integration.multicast.queue2",  :auto_delete => true)
      @queue3   = @channel.queue("amqpgem.integration.multicast.queue3",  :auto_delete => true)

      @queues   = [@queue1, @queue2, @queue3]

      @received_messages = {
        @queue1.name => [],
        @queue2.name => [],
        @queue3.name => []
      }

      @expected_number_of_messages = {
        @queue1.name => 10,
        @queue2.name => 10,
        @queue3.name => 10
      }


      @sent_values = Array.new

      @queue1.bind(@exchange).subscribe do |payload|
        @received_messages[@queue1.name].push(payload.to_i)
      end # subscribe

      @queue2.bind(@exchange).subscribe do |payload|
        @received_messages[@queue2.name].push(payload.to_i)
      end # subscribe

      @queue3.bind(@exchange).subscribe do |payload|
        @received_messages[@queue3.name].push(payload.to_i)
      end # subscribe

      10.times do
        dispatched_data             = rand(5_000_000)
        @sent_values.push(dispatched_data)

        @exchange.publish(dispatched_data, :mandatory => false)
      end

      # for Rubinius, it is surprisingly slow on this workload
      done(1.5) {
        [@queue1, @queue2, @queue3].each do |q|
          @received_messages[q.name].size.should == @expected_number_of_messages[q.name]

          # this one is ordering assertion
          @received_messages[q.name].should == @sent_values
        end
      }
    end # it
  end
end # describe