1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
require "spec_helper"
describe Bunny::Consumer, "#cancel" do
let(:connection) do
c = Bunny.new(username: "bunny_gem", password: "bunny_password", vhost: "bunny_testbed")
c.start
c
end
after :each do
connection.close if connection.open?
end
context "with a non-blocking consumer" do
let(:queue_name) { "bunny.queues.#{rand}" }
it "cancels the consumer" do
delivered_data = []
t = Thread.new do
ch = connection.create_channel
q = ch.queue(queue_name, auto_delete: true, durable: false)
consumer = q.subscribe do |_, _, payload|
delivered_data << payload
end
expect(consumer.consumer_tag).not_to be_nil
cancel_ok = consumer.cancel
expect(cancel_ok.consumer_tag).to eq consumer.consumer_tag
ch.close
end
t.abort_on_exception = true
sleep 0.5
ch = connection.create_channel
ch.default_exchange.publish("", routing_key: queue_name)
sleep 0.7
expect(delivered_data).to be_empty
end
end
context "with a blocking consumer" do
let(:queue_name) { "bunny.queues.#{rand}" }
it "cancels the consumer" do
delivered_data = []
consumer = nil
t = Thread.new do
ch = connection.create_channel
q = ch.queue(queue_name, auto_delete: true, durable: false)
consumer = Bunny::Consumer.new(ch, q)
consumer.on_delivery do |_, _, payload|
delivered_data << payload
end
q.subscribe_with(consumer)
end
t.abort_on_exception = true
sleep 1.0
consumer.cancel
sleep 1.0
ch = connection.create_channel
ch.default_exchange.publish("", routing_key: queue_name)
sleep 0.7
expect(delivered_data).to be_empty
end
end
context "with a worker pool shutdown timeout configured" do
let(:queue_name) { "bunny.queues.#{rand}" }
it "processes the message if processing completes within the timeout" do
delivered_data = []
consumer = nil
t = Thread.new do
ch = connection.create_channel(nil, 1, false, 5)
q = ch.queue(queue_name, auto_delete: true, durable: false)
consumer = Bunny::Consumer.new(ch, q)
consumer.on_delivery do |_, _, payload|
sleep 2
delivered_data << payload
end
q.subscribe_with(consumer)
end
t.abort_on_exception = true
sleep 1.0
ch = connection.create_channel
ch.confirm_select
ch.default_exchange.publish("", routing_key: queue_name)
ch.wait_for_confirms
sleep 0.5
consumer.cancel
sleep 1.0
expect(delivered_data).to_not be_empty
end
it "kills the consumer if processing takes longer than the timeout" do
delivered_data = []
consumer = nil
t = Thread.new do
ch = connection.create_channel(nil, 1, false, 1)
q = ch.queue(queue_name, auto_delete: true, durable: false)
consumer = Bunny::Consumer.new(ch, q)
consumer.on_delivery do |_, _, payload|
sleep 3
delivered_data << payload
end
q.subscribe_with(consumer)
end
t.abort_on_exception = true
sleep 1.0
ch = connection.create_channel
ch.confirm_select
ch.default_exchange.publish("", routing_key: queue_name)
ch.wait_for_confirms
sleep 0.5
consumer.cancel
sleep 1.0
expect(delivered_data).to be_empty
end
end
end
|