File: consumer_cancellation_notification_spec.rb

package info (click to toggle)
ruby-bunny 2.23.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,644 kB
  • sloc: ruby: 10,256; sh: 70; makefile: 8
file content (128 lines) | stat: -rw-r--r-- 3,010 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
require "spec_helper"

describe Bunny::Channel do
  let(:connection) do
    c = Bunny.new(username: "bunny_gem", password: "bunny_password", vhost: "bunny_testbed")
    c.start
    c
  end

  after :each do
    connection.close if connection.open?
  end

  context "with implicit consumer construction" do
    let(:queue_name) { "basic.consume#{rand}" }

    it "supports consumer cancellation notifications" do
      cancelled = false

      ch = connection.create_channel
      t  = Thread.new do
        ch2 = connection.create_channel
        q   = ch2.queue(queue_name, auto_delete: true)

        q.subscribe(on_cancellation: Proc.new { cancelled = true })
      end
      t.abort_on_exception = true

      sleep 0.5
      x = ch.default_exchange
      x.publish("abc", routing_key: queue_name)

      sleep 0.5
      ch.queue(queue_name, auto_delete: true).delete

      sleep 0.5
      expect(cancelled).to eq true

      ch.close
    end
  end


  context "with explicit consumer construction" do
    class ExampleConsumer < Bunny::Consumer
      def cancelled?
        @cancelled
      end

      def handle_cancellation(_)
        @cancelled = true
      end
    end

    let(:queue_name) { "basic.consume#{rand}" }

    it "supports consumer cancellation notifications" do
      consumer = nil

      ch = connection.create_channel
      t  = Thread.new do
        ch2 = connection.create_channel
        q   = ch2.queue(queue_name, auto_delete: true)

        consumer = ExampleConsumer.new(ch2, q, "")
        q.subscribe_with(consumer)
      end
      t.abort_on_exception = true

      sleep 0.5
      x = ch.default_exchange
      x.publish("abc", routing_key: queue_name)

      sleep 0.5
      ch.queue(queue_name, auto_delete: true).delete

      sleep 0.5
      expect(consumer).to be_cancelled

      ch.close
    end
  end



  context "with consumer re-registration" do
    class ExampleConsumerThatReregisters < Bunny::Consumer
      def handle_cancellation(_)
        @queue = @channel.queue("basic.consume.after_cancellation", auto_delete: true)
        @channel.basic_consume_with(self)
      end
    end

    let(:queue_name) { "basic.consume#{rand}" }

    it "works correctly" do
      consumer = nil
      xs       = []

      ch = connection.create_channel
      t  = Thread.new do
        ch2 = connection.create_channel
        q   = ch2.queue(queue_name, auto_delete: true)

        consumer = ExampleConsumerThatReregisters.new(ch2, q, "")
        consumer.on_delivery do |_, _, payload|
          xs << payload
        end
        q.subscribe_with(consumer)
      end
      t.abort_on_exception = true

      sleep 0.5
      x = ch.default_exchange
      x.publish("abc", routing_key: queue_name)

      sleep 0.5
      ch.queue(queue_name, auto_delete: true).delete

      x.publish("abc", routing_key: queue_name)
      sleep 0.5
      q = ch.queue("basic.consume.after_cancellation", auto_delete: true)
      expect(xs).to eq ["abc"]

      ch.close
    end
  end
end