1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
|
# encoding: utf-8
require "amqp/consumer_tag_generator"
module AMQP
# AMQP consumers are entities that handle messages delivered to them ("push API" as opposed to "pull API") by AMQP broker.
# Every consumer is associated with a queue. Consumers can be exclusive (no other consumers can be registered for the same queue)
# or not (consumers share the queue). In the case of multiple consumers per queue, messages are distributed in round robin
# manner with respect to channel-level prefetch setting).
#
# @see AMQP::Queue
# @see AMQP::Queue#subscribe
# @see AMQP::Queue#cancel
class Consumer
#
# Behaviours
#
include Callbacks
extend ProtocolMethodHandlers
#
# API
#
# @return [AMQP::Channel] Channel this consumer uses
attr_reader :channel
# @return [AMQP::Queue] Queue messages are consumed from
attr_reader :queue
# @return [String] Consumer tag, unique consumer identifier
attr_reader :consumer_tag
# @return [Hash] Custom subscription metadata
attr_reader :arguments
# @return [AMQP::ConsumerTagGenerator] Consumer tag generator
def self.tag_generator
@tag_generator ||= AMQP::ConsumerTagGenerator.new
end # self.tag_generator
# @param [AMQP::ConsumerTagGenerator] Assigns consumer tag generator that will be used by consumer instances
# @return [AMQP::ConsumerTagGenerator] Provided argument
def self.tag_generator=(generator)
@tag_generator = generator
end
def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false, &block)
@callbacks = Hash.new
@channel = channel || raise(ArgumentError, "channel is nil")
@connection = channel.connection || raise(ArgumentError, "connection is nil")
@queue = queue || raise(ArgumentError, "queue is nil")
@consumer_tag = consumer_tag || self.class.tag_generator.generate_for(queue)
@exclusive = exclusive
@no_ack = no_ack
@arguments = arguments
@no_local = no_local
self.register_with_channel
self.register_with_queue
end # initialize
# @return [Boolean] true if this consumer is exclusive (other consumers for the same queue are not allowed)
def exclusive?
!!@exclusive
end # exclusive?
# Begin consuming messages from the queue
# @return [AMQP::Consumer] self
def consume(nowait = false, &block)
@channel.once_open do
@queue.once_declared do
@connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, nowait, @arguments))
if !nowait
self.redefine_callback(:consume, &block)
@channel.consumers_awaiting_consume_ok.push(self)
end
self
end
end
self
end # consume(nowait = false, &block)
# Used by automatic recovery code.
# @api plugin
# @return [AMQP::Consumer] self
def resubscribe(&block)
@channel.once_open do
@queue.once_declared do
self.unregister_with_channel
@consumer_tag = self.class.tag_generator.generate_for(@queue)
self.register_with_channel
@connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, block.nil?, @arguments))
self.redefine_callback(:consume, &block) if block
self
end
end
self
end # resubscribe(&block)
# @return [AMQP::Consumer] self
def cancel(nowait = false, &block)
@channel.once_open do
@queue.once_declared do
@connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait))
if !nowait
self.redefine_callback(:cancel, &block)
@channel.consumers_awaiting_cancel_ok.push(self)
end
self
end
end
self
end # cancel(nowait = false, &block)
# {AMQP::Queue} API compatibility.
#
# @return [Boolean] true if this consumer is active (subscribed for message delivery)
# @api public
def subscribed?
!@callbacks[:delivery].empty?
end # subscribed?
# Legacy {AMQP::Queue} API compatibility.
# @private
# @deprecated
def callback
if @callbacks[:delivery]
@callbacks[:delivery].first
end
end # callback
# Register a block that will be used to handle delivered messages.
#
# @return [AMQP::Consumer] self
# @see AMQP::Queue#subscribe
def on_delivery(&block)
# We have to maintain this multiple arities jazz
# because older versions this gem are used in examples in at least 3
# books published by O'Reilly :(. MK.
delivery_shim = Proc.new { |basic_deliver, headers, payload|
case block.arity
when 1 then
block.call(payload)
when 2 then
h = Header.new(@channel, basic_deliver, headers.decode_payload)
block.call(h, payload)
else
h = Header.new(@channel, basic_deliver, headers.decode_payload)
block.call(h, payload, basic_deliver.consumer_tag, basic_deliver.delivery_tag, basic_deliver.redelivered, basic_deliver.exchange, basic_deliver.routing_key)
end
}
self.append_callback(:delivery, &delivery_shim)
self
end # on_delivery(&block)
# @return [String] Readable representation of relevant object state.
def inspect
"#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}"
end # inspect
def on_cancel(&block)
self.append_callback(:scancel, &block)
self
end # on_cancel(&block)
def handle_cancel(basic_cancel)
self.exec_callback(:scancel, basic_cancel)
end # handle_cancel(basic_cancel)
# @group Acknowledging & Rejecting Messages
# Acknowledge a delivery tag.
# @return [Consumer] self
#
# @api public
# @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
def acknowledge(delivery_tag)
@channel.acknowledge(delivery_tag)
self
end # acknowledge(delivery_tag)
#
# @return [Consumer] self
#
# @api public
# @see http://bit.ly/htCzCX AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
def reject(delivery_tag, requeue = true)
@channel.reject(delivery_tag, requeue)
self
end # reject(delivery_tag, requeue = true)
# Defines a callback that will be executed when AMQP connection is recovered after a network failure..
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
# Defines a callback that will be executed when AMQP connection is recovered after a network failure..
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def on_recovery(&block)
self.redefine_callback(:after_recovery, &block)
end # on_recovery(&block)
alias after_recovery on_recovery
# @endgroup
# @group Error Handling & Recovery
# Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def on_connection_interruption(&block)
self.redefine_callback(:after_connection_interruption, &block)
end # on_connection_interruption(&block)
alias after_connection_interruption on_connection_interruption
# @private
def handle_connection_interruption(method = nil)
self.exec_callback_yielding_self(:after_connection_interruption)
end # handle_connection_interruption
# Defines a callback that will be executed after TCP connection is recovered after a network failure
# but before AMQP connection is re-opened.
# Only one callback can be defined (the one defined last replaces previously added ones).
#
# @api public
def before_recovery(&block)
self.redefine_callback(:before_recovery, &block)
end # before_recovery(&block)
# @private
def run_before_recovery_callbacks
self.exec_callback_yielding_self(:before_recovery)
end
# @private
def run_after_recovery_callbacks
self.exec_callback_yielding_self(:after_recovery)
end
# Called by associated connection object when AMQP connection has been re-established
# (for example, after a network failure).
#
# @api plugin
def auto_recover
self.exec_callback_yielding_self(:before_recovery)
self.resubscribe
self.exec_callback_yielding_self(:after_recovery)
end # auto_recover
# @endgroup
def to_s
"#<#{self.class.name} @consumer_tag=#{@consumer_tag} @queue=#{@queue.name} @channel=#{@channel.id}>"
end
#
# Implementation
#
def handle_delivery(basic_deliver, metadata, payload)
self.exec_callback(:delivery, basic_deliver, metadata, payload)
end # handle_delivery(basic_deliver, metadata, payload)
def handle_consume_ok(consume_ok)
self.exec_callback_once(:consume, consume_ok)
end # handle_consume_ok(consume_ok)
def handle_cancel_ok(cancel_ok)
self.exec_callback_once(:cancel, cancel_ok)
self.unregister_with_channel
self.unregister_with_queue
@consumer_tag = nil
# detach from object graph so that this object will be garbage-collected
@queue = nil
@channel = nil
@connection = nil
self.clear_callbacks(:delivery)
self.clear_callbacks(:consume)
self.clear_callbacks(:cancel)
self.clear_callbacks(:scancel)
end # handle_cancel_ok(method)
self.handle(AMQ::Protocol::Basic::ConsumeOk) do |connection, frame|
channel = connection.channels[frame.channel]
consumer = channel.consumers_awaiting_consume_ok.shift
consumer.handle_consume_ok(frame.decode_payload)
end
self.handle(AMQ::Protocol::Basic::CancelOk) do |connection, frame|
channel = connection.channels[frame.channel]
consumer = channel.consumers_awaiting_cancel_ok.shift
consumer.handle_cancel_ok(frame.decode_payload)
end
self.handle(AMQ::Protocol::Basic::Deliver) do |connection, method_frame, content_frames|
channel = connection.channels[method_frame.channel]
basic_deliver = method_frame.decode_payload
consumer = channel.consumers[basic_deliver.consumer_tag]
metadata = content_frames.shift
payload = content_frames.map { |frame| frame.payload }.join
# Handle the delivery only if the consumer still exists.
# The broker has been known to deliver a few messages after the consumer has been shut down.
consumer.handle_delivery(basic_deliver, metadata, payload) if consumer
end
protected
def register_with_channel
@channel.consumers[@consumer_tag] = self
end # register_with_channel
def register_with_queue
@queue.consumers[@consumer_tag] = self
end # register_with_queue
def unregister_with_channel
@channel.consumers.delete(@consumer_tag)
end # register_with_channel
def unregister_with_queue
@queue.consumers.delete(@consumer_tag)
end # register_with_queue
handle(AMQ::Protocol::Basic::Cancel) do |connection, method_frame|
channel = connection.channels[method_frame.channel]
basic_cancel = method_frame.decode_payload
consumer = channel.consumers[basic_cancel.consumer_tag]
# Handle the delivery only if the consumer still exists.
consumer.handle_cancel(basic_cancel) if consumer
end
end # Consumer
end # AMQP
|