File: basic_consume_spec.rb

package info (click to toggle)
ruby-bunny 2.23.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,644 kB
  • sloc: ruby: 10,256; sh: 70; makefile: 8
file content (99 lines) | stat: -rw-r--r-- 2,824 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
require "spec_helper"

describe Bunny::Channel, "#basic_consume" do
  before(:all) do
    @connection = Bunny.new(:user => "bunny_gem", password: "bunny_password", :vhost => "bunny_testbed")
    @connection.start
  end

  after :all do
    @connection.close if @connection.open?
  end

  it "returns basic.consume-ok when it is received" do
    ch = @connection.create_channel
    q  = ch.queue("", exclusive: true)

    consume_ok = ch.basic_consume(q)
    expect(consume_ok).to be_instance_of AMQ::Protocol::Basic::ConsumeOk
    expect(consume_ok.consumer_tag).not_to be_nil

    ch.close
  end

  it "carries server-generated consumer tag with basic.consume-ok" do
    ch = @connection.create_channel
    q  = ch.queue("", exclusive: true)

    consume_ok = ch.basic_consume(q, "")
    expect(consume_ok.consumer_tag).to match /amq\.ctag.*/

    ch.close
  end

  context "with automatic acknowledgement mode" do
    let(:queue_name) { "bunny.basic_consume#{rand}" }

    it "causes messages to be automatically removed from the queue after delivery" do
      delivered_keys = []
      delivered_data = []

      t = Thread.new do
        ch = @connection.create_channel
        q = ch.queue(queue_name, :auto_delete => true, :durable => false)
        ch.basic_consume(q, "", true, false) do |delivery_info, properties, payload|
          delivered_keys << delivery_info.routing_key
          delivered_data << payload
        end
      end
      t.abort_on_exception = true
      sleep 0.5

      ch = @connection.create_channel
      x  = ch.default_exchange
      x.publish("hello", routing_key: queue_name)

      sleep 0.7
      expect(delivered_keys).to include queue_name
      expect(delivered_data).to include "hello"

      expect(ch.queue(queue_name, auto_delete: true, durable: false).message_count).to eq 0

      ch.close
    end
  end

  context "with manual acknowledgement mode" do
    let(:queue_name) { "bunny.basic_consume#{rand}" }

    it "waits for an explicit acknowledgement" do
      delivered_keys = []
      delivered_data = []

      t = Thread.new do
        ch = @connection.create_channel
        q = ch.queue(queue_name, auto_delete: true, durable: false)
        ch.basic_consume(q, "", false, false) do |delivery_info, properties, payload|
          delivered_keys << delivery_info.routing_key
          delivered_data << payload

          ch.close
        end
      end
      t.abort_on_exception = true
      sleep 0.5

      ch = @connection.create_channel
      x  = ch.default_exchange
      x.publish("hello", routing_key: queue_name)

      sleep 0.7
      expect(delivered_keys).to include queue_name
      expect(delivered_data).to include "hello"

      expect(ch.queue(queue_name, auto_delete: true, durable: false).message_count).to eq 0

      ch.close
    end
  end
end