1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
require "thread"
module Bunny
# Network activity loop that reads and passes incoming AMQP 0.9.1 methods for
# processing. They are dispatched further down the line in Bunny::Session and Bunny::Channel.
# This loop uses a separate thread internally.
#
# This mimics the way RabbitMQ Java is designed quite closely.
# @private
class ReaderLoop
def initialize(transport, session, session_error_handler)
@transport = transport
@session = session
@session_error_handler = session_error_handler
@logger = @session.logger
@mutex = Mutex.new
@stopping = false
@stopped = false
@network_is_down = false
end
def start
@thread = Thread.new(&method(:run_loop))
end
def resume
start
end
def run_loop
loop do
begin
break if @mutex.synchronize { @stopping || @stopped || @network_is_down }
run_once
rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Timeout::Error,
OpenSSL::OpenSSLError => e
break if terminate? || @session.closing? || @session.closed?
@network_is_down = true
if @session.automatically_recover?
log_exception(e, level: :warn)
@session.handle_network_failure(e)
else
log_exception(e)
@session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
end
rescue ShutdownSignal => _
@mutex.synchronize { @stopping = true }
break
rescue Exception => e
break if terminate?
if !(@session.closing? || @session.closed?)
log_exception(e)
@network_is_down = true
@session_error_handler.raise(Bunny::NetworkFailure.new("caught an unexpected exception in the network loop: #{e.message}", e))
end
rescue Errno::EBADF => _ebadf
break if terminate?
# ignored, happens when we loop after the transport has already been closed
@mutex.synchronize { @stopping = true }
end
end
@mutex.synchronize { @stopped = true }
end
def run_once
frame = @transport.read_next_frame
return if frame.is_a?(AMQ::Protocol::HeartbeatFrame)
if !frame.final? || frame.method_class.has_content?
header = @transport.read_next_frame
content = ''
if header.body_size > 0
loop do
body_frame = @transport.read_next_frame
content << body_frame.decode_payload
break if content.bytesize >= header.body_size
end
end
@session.handle_frameset(frame.channel, [frame.decode_payload, header.decode_payload, content])
else
@session.handle_frame(frame.channel, frame.decode_payload)
end
end
def stop
@mutex.synchronize { @stopping = true }
end
def stopped?
@mutex.synchronize { @stopped }
end
def stopping?
@mutex.synchronize { @stopping }
end
def terminate_with(e)
@mutex.synchronize { @stopping = true }
self.raise(e)
end
def raise(e)
@thread.raise(e) if @thread
end
def join
# Thread#join can/would trigger a re-raise of an unhandled exception in this thread.
# In addition, Thread.handle_interrupt can be used by other libraries or application code
# that would make this join operation fail with an obscure exception.
# So we try to save everyone some really unpleasant debugging time by introducing
# this condition which typically would not evaluate to true anyway.
#
# See ruby-amqp/bunny#589 and ruby-amqp/bunny#590 for background.
@thread.join if @thread && @thread != Thread.current
end
def kill
if @thread
@thread.kill
@thread.join
end
end
protected
def log_exception(e, level: :error)
if !(io_error?(e) && (@session.closing? || @session.closed?))
@logger.send level, "Exception in the reader loop: #{e.class.name}: #{e.message}"
@logger.send level, "Backtrace: "
e.backtrace.each do |line|
@logger.send level, "\t#{line}"
end
end
end
def io_error?(e)
[AMQ::Protocol::EmptyResponseError, IOError, SystemCallError].any? do |klazz|
e.is_a?(klazz)
end
end
def terminate?
@mutex.synchronize { @stopping || @stopped }
end
end
end
|