1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
# encoding: utf-8
require "spec_helper"
describe AMQP::Queue, "#pop" do
#
# Environment
#
include EventedSpec::AMQPSpec
include EventedSpec::SpecHelper
default_options AMQP_OPTS
default_timeout 10
amqp_before do
@channel = AMQP::Channel.new
@channel.should be_open
@queue_name = "amqpgem.integration.basic.get.queue"
@exchange = @channel.fanout("amqpgem.integration.basic.get.queue", :auto_delete => true)
@queue = @channel.queue(@queue_name, :auto_delete => true)
@queue.bind(@exchange)
@dispatched_data = "fetch me synchronously"
end
#
# Examples
#
context "when THERE ARE NO messages in the queue" do
it "yields nil (instead of message payload) to the callback" do
@queue.purge do
callback_has_fired = false
@queue.status do |number_of_messages, number_of_consumers|
number_of_messages.should == 0
end
@queue.pop do |payload|
callback_has_fired = true
@queue.delete
payload.should be_nil
end
done(0.2) {
callback_has_fired.should be_true
}
end
end
end
context "when THERE ARE messages in the queue" do
it "yields message payload to the callback" do
@channel.default_exchange.publish(@dispatched_data, :routing_key => @queue.name)
@queue.pop do |headers, payload|
payload.should == @dispatched_data
end
delayed(0.5) {
# Queue.Get doesn't qualify for subscription, hence, manual deletion is required
@queue.delete
}
done(0.8)
end # it
end # context
context "with manual acknowledgements" do
default_timeout 4
let(:queue_name) { "amqpgem.integration.basic.get.acks.manual#{rand}" }
it "does not remove messages from the queue unless ack-ed" do
ch1 = AMQP::Channel.new
ch2 = AMQP::Channel.new
ch1.on_error do |ch, close_ok|
puts "Channel error: #{close_ok.reply_code} #{close_ok.reply_text}"
end
q = ch1.queue(queue_name, :exclusive => true)
x = ch1.default_exchange
q.purge
delayed(0.2) { x.publish(@dispatched_data, :routing_key => q.name) }
delayed(0.5) {
q.pop(:ack => true) do |meta, payload|
# never ack
end
ch1.close
EventMachine.add_timer(0.7) {
ch2.queue(queue_name, :exclusive => true).status do |number_of_messages, number_of_consumers|
number_of_messages.should == 1
done
end
}
}
end
end
context "with automatic acknowledgements" do
default_timeout 4
let(:queue_name) { "amqpgem.integration.basic.get.acks.automatic#{rand}" }
it "does remove messages from the queue after delivery" do
ch1 = AMQP::Channel.new
ch2 = AMQP::Channel.new
ch1.on_error do |ch, close_ok|
puts "Channel error: #{close_ok.reply_code} #{close_ok.reply_text}"
end
q = ch1.queue(queue_name, :exclusive => true)
x = ch1.default_exchange
q.purge
x.publish(@dispatched_data, :routing_key => q.name)
delayed(0.5) {
q.pop(:ack => false) do |meta, payload|
# never ack
end
ch1.close
EventMachine.add_timer(0.5) {
ch2.queue(queue_name, :exclusive => true).status do |number_of_messages, number_of_consumers|
number_of_messages.should == 0
done
end
}
}
end
end
end # describe
|