File: amqp_spec.rb

package info (click to toggle)
ruby-em-synchrony 1.0.5-3.1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye
  • size: 572 kB
  • sloc: ruby: 3,458; sh: 37; sql: 7; makefile: 2
file content (144 lines) | stat: -rw-r--r-- 4,161 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
require "spec/helper/all"

describe EM::Synchrony::AMQP do

  xit "should yield until connection is ready" do
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      connection.connected?.should eq(true)
      EM.stop
    end
  end

  xit "should yield until disconnection is complete" do
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      connection.disconnect
      connection.connected?.should eq(false)
      EM.stop
    end
  end

  xit "should yield until the channel is created" do
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      channel = EM::Synchrony::AMQP::Channel.new(connection)
      channel.should be_kind_of(EM::Synchrony::AMQP::Channel)
      EM.stop
    end
  end

  xit "should yield until the queue is created" do
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      channel = EM::Synchrony::AMQP::Channel.new(connection)
      queue = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queue1", :auto_delete => true)
      EM.stop
    end
  end

  xit "should yield when a queue is created from a channel" do
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      channel = EM::Synchrony::AMQP::Channel.new(connection)
      queue = channel.queue("test.em-synchrony.queue1", :auto_delete => true)
      EM.stop
    end
  end

  xit "should yield until the exchange is created" do
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      channel = EM::Synchrony::AMQP::Channel.new(connection)

      exchange = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.exchange")
      exchange.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)

      [:direct, :fanout, :topic, :headers].each do |type|
        # Exercise cached exchange code path
        2.times.map { channel.send(type, "test.em-synchrony.#{type}") }.each do |ex|
          ex.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)
        end
      end

      EM.stop
    end
  end

  xit "should publish and receive messages" do
    nb_msg = 10
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      channel = EM::Synchrony::AMQP::Channel.new(connection)
      ex = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout")

      q1 = channel.queue("test.em-synchrony.queues.1", :auto_delete => true)
      q2 = channel.queue("test.em-synchrony.queues.2", :auto_delete => true)

      q1.bind(ex)
      q2.bind(ex)

      nb_q1, nb_q2 = 0, 0
      stop_cb = proc { EM.stop if nb_q1 + nb_q2 == 2 * nb_msg }

      q1.subscribe(:ack => false) do |meta, msg|
        msg.should match(/^Bonjour [0-9]+/)
        nb_q1 += 1
        stop_cb.call
      end

      q2.subscribe do |meta, msg|
        msg.should match(/^Bonjour [0-9]+/)
        nb_q2 += 1
        stop_cb.call
      end

      Fiber.new do
        nb_msg.times do |n|
          ex.publish("Bonjour #{n}")
          EM::Synchrony.sleep(0.1)
        end
      end.resume
    end
  end

  xit "should handle several consumers" do
    nb_msg = 10
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      channel = EM::Synchrony::AMQP::Channel.new(connection)
      exchange = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.consumers.fanout")

      queue = channel.queue("test.em-synchrony.consumers.queue", :auto_delete => true)
      queue.bind(exchange)

      cons1 = EM::Synchrony::AMQP::Consumer.new(channel, queue)
      cons2 = EM::Synchrony::AMQP::Consumer.new(channel, queue)

      nb_cons1, nb_cons2 = 0, 0
      stop_cb = Proc.new do
        if nb_cons1 + nb_cons2 == nb_msg
          nb_cons1.should eq(nb_cons2)
          EM.stop
        end
      end

      cons1.on_delivery do |meta, msg|
        msg.should match(/^Bonjour [0-9]+/)
        nb_cons1 += 1
        stop_cb.call
      end.consume

      cons2.on_delivery do |meta, msg|
        msg.should match(/^Bonjour [0-9]+/)
        nb_cons2 += 1
        stop_cb.call
      end.consume

      10.times do |n|
        exchange.publish("Bonjour #{n}")
        EM::Synchrony.sleep(0.1)
      end
   end
  end
end