1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
# encoding: utf-8
require 'spec_helper'
describe "Messages published before AMQP transaction commits" do
#
# Environment
#
include EventedSpec::AMQPSpec
default_timeout 1.5
amqp_before do
@producer_channel = AMQP::Channel.new
@consumer_channel = AMQP::Channel.new
end
# ...
#
# Examples
#
it "are not accessible to AMQP consumers" do
exchange = @producer_channel.fanout("amq.fanout")
queue = @consumer_channel.queue("", :exclusive => true)
queue.bind(exchange).subscribe do |metadata, payload|
fail "Consumer received a message before transaction committed"
end
@producer_channel.tx_select
EventMachine.add_timer(0.5) do
50.times { exchange.publish("before tx.commit") }
# @producer_channel.tx_commit
end
done(1.2)
end # it
end # describe
describe "AMQP transaction commit" do
#
# Environment
#
include EventedSpec::AMQPSpec
default_timeout 1.5
amqp_before do
@producer_channel = AMQP::Channel.new
@consumer_channel = AMQP::Channel.new
end
# ...
#
# Examples
#
it "causes messages published since the last tx.select to be delivered to AMQP consumers" do
exchange = @producer_channel.fanout("amq.fanout")
queue = @consumer_channel.queue("", :exclusive => true)
queue.bind(exchange).subscribe { |metadata, payload| done }
@producer_channel.tx_select
EventMachine.add_timer(0.5) do
50.times { exchange.publish("before tx.commit") }
@producer_channel.tx_commit
end
done(1.2)
end # it
end # describe
describe "AMQP transaction commit attempt on a non-transactional channel" do
#
# Environment
#
include EventedSpec::AMQPSpec
default_timeout 1.5
amqp_before do
@producer_channel = AMQP::Channel.new
@consumer_channel = AMQP::Channel.new
end
# ...
#
# Examples
#
it "causes channel-level exception" do
exchange = @producer_channel.fanout("amq.fanout")
queue = @consumer_channel.queue("", :exclusive => true)
queue.bind(exchange).subscribe do |metadata, payload|
fail "Consumer received a message before transaction committed"
end
@producer_channel.on_error do |ch, channel_close|
puts "#{channel_close.reply_text}"
done
end
EventMachine.add_timer(0.5) { @producer_channel.tx_commit }
done(1.2)
end # it
end # describe
|