1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
|
require "spec_helper"
describe Bunny::Channel, "#basic_consume" do
before(:all) do
@connection = Bunny.new(:user => "bunny_gem", password: "bunny_password", :vhost => "bunny_testbed")
@connection.start
end
after :all do
@connection.close if @connection.open?
end
it "returns basic.consume-ok when it is received" do
ch = @connection.create_channel
q = ch.queue("", exclusive: true)
consume_ok = ch.basic_consume(q)
expect(consume_ok).to be_instance_of AMQ::Protocol::Basic::ConsumeOk
expect(consume_ok.consumer_tag).not_to be_nil
ch.close
end
it "carries server-generated consumer tag with basic.consume-ok" do
ch = @connection.create_channel
q = ch.queue("", exclusive: true)
consume_ok = ch.basic_consume(q, "")
expect(consume_ok.consumer_tag).to match /amq\.ctag.*/
ch.close
end
context "with automatic acknowledgement mode" do
let(:queue_name) { "bunny.basic_consume#{rand}" }
it "causes messages to be automatically removed from the queue after delivery" do
delivered_keys = []
delivered_data = []
t = Thread.new do
ch = @connection.create_channel
q = ch.queue(queue_name, :auto_delete => true, :durable => false)
ch.basic_consume(q, "", true, false) do |delivery_info, properties, payload|
delivered_keys << delivery_info.routing_key
delivered_data << payload
end
end
t.abort_on_exception = true
sleep 0.5
ch = @connection.create_channel
x = ch.default_exchange
x.publish("hello", routing_key: queue_name)
sleep 0.7
expect(delivered_keys).to include queue_name
expect(delivered_data).to include "hello"
expect(ch.queue(queue_name, auto_delete: true, durable: false).message_count).to eq 0
ch.close
end
end
context "with manual acknowledgement mode" do
let(:queue_name) { "bunny.basic_consume#{rand}" }
it "waits for an explicit acknowledgement" do
delivered_keys = []
delivered_data = []
t = Thread.new do
ch = @connection.create_channel
q = ch.queue(queue_name, auto_delete: true, durable: false)
ch.basic_consume(q, "", false, false) do |delivery_info, properties, payload|
delivered_keys << delivery_info.routing_key
delivered_data << payload
ch.close
end
end
t.abort_on_exception = true
sleep 0.5
ch = @connection.create_channel
x = ch.default_exchange
x.publish("hello", routing_key: queue_name)
sleep 0.7
expect(delivered_keys).to include queue_name
expect(delivered_data).to include "hello"
expect(ch.queue(queue_name, auto_delete: true, durable: false).message_count).to eq 0
ch.close
end
end
end
|