File: consumer.rb

package info (click to toggle)
ruby-amqp 1.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 2,508 kB
  • sloc: ruby: 8,272; sh: 11; makefile: 10
file content (379 lines) | stat: -rw-r--r-- 11,912 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# encoding: utf-8

require "amqp/consumer_tag_generator"

module AMQP
  # AMQP consumers are entities that handle messages delivered to them ("push API" as opposed to "pull API") by AMQP broker.
  # Every consumer is associated with a queue. Consumers can be exclusive (no other consumers can be registered for the same queue)
  # or not (consumers share the queue). In the case of multiple consumers per queue, messages are distributed in round robin
  # manner with respect to channel-level prefetch setting).
  #
  # @see AMQP::Queue
  # @see AMQP::Queue#subscribe
  # @see AMQP::Queue#cancel
  class Consumer

    #
    # Behaviours
    #

    include Callbacks
    extend  ProtocolMethodHandlers


    #
    # API
    #

    # @return [AMQP::Channel] Channel this consumer uses
    attr_reader :channel
    # @return [AMQP::Queue] Queue messages are consumed from
    attr_reader :queue
    # @return [String] Consumer tag, unique consumer identifier
    attr_reader :consumer_tag
    # @return [Hash] Custom subscription metadata
    attr_reader :arguments


    # @return [AMQP::ConsumerTagGenerator] Consumer tag generator
    def self.tag_generator
      @tag_generator ||= AMQP::ConsumerTagGenerator.new
    end # self.tag_generator

    # @param [AMQP::ConsumerTagGenerator] Assigns consumer tag generator that will be used by consumer instances
    # @return [AMQP::ConsumerTagGenerator] Provided argument
    def self.tag_generator=(generator)
      @tag_generator = generator
    end


    def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false, &block)
      @callbacks    = Hash.new

      @channel       = channel            || raise(ArgumentError, "channel is nil")
      @connection    = channel.connection || raise(ArgumentError, "connection is nil")
      @queue         = queue        || raise(ArgumentError, "queue is nil")
      @consumer_tag  = consumer_tag || self.class.tag_generator.generate_for(queue)
      @exclusive     = exclusive
      @no_ack        = no_ack
      @arguments     = arguments

      @no_local     = no_local

      self.register_with_channel
      self.register_with_queue
    end # initialize

    # @return [Boolean] true if this consumer is exclusive (other consumers for the same queue are not allowed)
    def exclusive?
      !!@exclusive
    end # exclusive?



    # Begin consuming messages from the queue
    # @return [AMQP::Consumer] self
    def consume(nowait = false, &block)
      @channel.once_open do
        @queue.once_declared do
          @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, nowait, @arguments))

          if !nowait
            self.redefine_callback(:consume, &block)
            @channel.consumers_awaiting_consume_ok.push(self)
          end

          self
        end
      end

      self
    end # consume(nowait = false, &block)

    # Used by automatic recovery code.
    # @api plugin
    # @return [AMQP::Consumer] self
    def resubscribe(&block)
      @channel.once_open do
        @queue.once_declared do
          self.unregister_with_channel
          @consumer_tag = self.class.tag_generator.generate_for(@queue)
          self.register_with_channel

          @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, block.nil?, @arguments))
          self.redefine_callback(:consume, &block) if block

          self
        end
      end

      self
    end # resubscribe(&block)

    # @return [AMQP::Consumer] self
    def cancel(nowait = false, &block)
      @channel.once_open do
        @queue.once_declared do
          @connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait))
          if !nowait
            self.redefine_callback(:cancel, &block)
            @channel.consumers_awaiting_cancel_ok.push(self)
          end

          self
        end
      end

      self
    end # cancel(nowait = false, &block)

    # {AMQP::Queue} API compatibility.
    #
    # @return [Boolean] true if this consumer is active (subscribed for message delivery)
    # @api public
    def subscribed?
      !@callbacks[:delivery].empty?
    end # subscribed?

    # Legacy {AMQP::Queue} API compatibility.
    # @private
    # @deprecated
    def callback
      if @callbacks[:delivery]
        @callbacks[:delivery].first
      end
    end # callback


    # Register a block that will be used to handle delivered messages.
    #
    # @return [AMQP::Consumer] self
    # @see AMQP::Queue#subscribe
    def on_delivery(&block)
      # We have to maintain this multiple arities jazz
      # because older versions this gem are used in examples in at least 3
      # books published by O'Reilly :(. MK.
      delivery_shim = Proc.new { |basic_deliver, headers, payload|
        case block.arity
        when 1 then
          block.call(payload)
        when 2 then
          h = Header.new(@channel, basic_deliver, headers.decode_payload)
          block.call(h, payload)
        else
          h = Header.new(@channel, basic_deliver, headers.decode_payload)
          block.call(h, payload, basic_deliver.consumer_tag, basic_deliver.delivery_tag, basic_deliver.redelivered, basic_deliver.exchange, basic_deliver.routing_key)
        end
      }

      self.append_callback(:delivery, &delivery_shim)

      self
    end # on_delivery(&block)


    # @return [String] Readable representation of relevant object state.
    def inspect
      "#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}"
    end # inspect


    def on_cancel(&block)
      self.append_callback(:scancel, &block)

      self
    end # on_cancel(&block)

    def handle_cancel(basic_cancel)
      self.exec_callback(:scancel, basic_cancel)
    end # handle_cancel(basic_cancel)



    # @group Acknowledging & Rejecting Messages

    # Acknowledge a delivery tag.
    # @return [Consumer]  self
    #
    # @api public
    # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
    def acknowledge(delivery_tag)
      @channel.acknowledge(delivery_tag)

      self
    end # acknowledge(delivery_tag)

    #
    # @return [Consumer]  self
    #
    # @api public
    # @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
    def reject(delivery_tag, requeue = true)
      @channel.reject(delivery_tag, requeue)

      self
    end # reject(delivery_tag, requeue = true)

    # Defines a callback that will be executed when AMQP connection is recovered after a network failure..
    # Only one callback can be defined (the one defined last replaces previously added ones).
    #
    # @api public
    # Defines a callback that will be executed when AMQP connection is recovered after a network failure..
    # Only one callback can be defined (the one defined last replaces previously added ones).
    #
    # @api public
    def on_recovery(&block)
      self.redefine_callback(:after_recovery, &block)
    end # on_recovery(&block)
    alias after_recovery on_recovery

    # @endgroup


    # @group Error Handling & Recovery

    # Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
    # Only one callback can be defined (the one defined last replaces previously added ones).
    #
    # @api public
    def on_connection_interruption(&block)
      self.redefine_callback(:after_connection_interruption, &block)
    end # on_connection_interruption(&block)
    alias after_connection_interruption on_connection_interruption

    # @private
    def handle_connection_interruption(method = nil)
      self.exec_callback_yielding_self(:after_connection_interruption)
    end # handle_connection_interruption


    # Defines a callback that will be executed after TCP connection is recovered after a network failure
    # but before AMQP connection is re-opened.
    # Only one callback can be defined (the one defined last replaces previously added ones).
    #
    # @api public
    def before_recovery(&block)
      self.redefine_callback(:before_recovery, &block)
    end # before_recovery(&block)

    # @private
    def run_before_recovery_callbacks
      self.exec_callback_yielding_self(:before_recovery)
    end

    # @private
    def run_after_recovery_callbacks
      self.exec_callback_yielding_self(:after_recovery)
    end



    # Called by associated connection object when AMQP connection has been re-established
    # (for example, after a network failure).
    #
    # @api plugin
    def auto_recover
      self.exec_callback_yielding_self(:before_recovery)
      self.resubscribe
      self.exec_callback_yielding_self(:after_recovery)
    end # auto_recover

    # @endgroup


    def to_s
      "#<#{self.class.name} @consumer_tag=#{@consumer_tag} @queue=#{@queue.name} @channel=#{@channel.id}>"
    end


    #
    # Implementation
    #

    def handle_delivery(basic_deliver, metadata, payload)
      self.exec_callback(:delivery, basic_deliver, metadata, payload)
    end # handle_delivery(basic_deliver, metadata, payload)

    def handle_consume_ok(consume_ok)
      self.exec_callback_once(:consume, consume_ok)
    end # handle_consume_ok(consume_ok)

    def handle_cancel_ok(cancel_ok)
      self.exec_callback_once(:cancel, cancel_ok)

      self.unregister_with_channel
      self.unregister_with_queue

      @consumer_tag = nil

      # detach from object graph so that this object will be garbage-collected
      @queue        = nil
      @channel      = nil
      @connection   = nil

      self.clear_callbacks(:delivery)
      self.clear_callbacks(:consume)
      self.clear_callbacks(:cancel)
      self.clear_callbacks(:scancel)
    end # handle_cancel_ok(method)



    self.handle(AMQ::Protocol::Basic::ConsumeOk) do |connection, frame|
      channel  = connection.channels[frame.channel]
      consumer = channel.consumers_awaiting_consume_ok.shift

      consumer.handle_consume_ok(frame.decode_payload)
    end


    self.handle(AMQ::Protocol::Basic::CancelOk) do |connection, frame|
      channel  = connection.channels[frame.channel]
      consumer = channel.consumers_awaiting_cancel_ok.shift

      consumer.handle_cancel_ok(frame.decode_payload)
    end


    self.handle(AMQ::Protocol::Basic::Deliver) do |connection, method_frame, content_frames|
      channel       = connection.channels[method_frame.channel]
      basic_deliver = method_frame.decode_payload
      consumer      = channel.consumers[basic_deliver.consumer_tag]

      metadata = content_frames.shift
      payload  = content_frames.map { |frame| frame.payload }.join

      # Handle the delivery only if the consumer still exists.
      # The broker has been known to deliver a few messages after the consumer has been shut down.
      consumer.handle_delivery(basic_deliver, metadata, payload) if consumer
    end


    protected

    def register_with_channel
      @channel.consumers[@consumer_tag] = self
    end # register_with_channel

    def register_with_queue
      @queue.consumers[@consumer_tag]   = self
    end # register_with_queue

    def unregister_with_channel
      @channel.consumers.delete(@consumer_tag)
    end # register_with_channel

    def unregister_with_queue
      @queue.consumers.delete(@consumer_tag)
    end # register_with_queue

    handle(AMQ::Protocol::Basic::Cancel) do |connection, method_frame|
      channel      = connection.channels[method_frame.channel]
      basic_cancel = method_frame.decode_payload
      consumer     = channel.consumers[basic_cancel.consumer_tag]

      # Handle the delivery only if the consumer still exists.
      consumer.handle_cancel(basic_cancel) if consumer
    end
  end # Consumer
end # AMQP