1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
|
# -*- encoding: utf-8 -*-
require 'socket'
require 'timeout'
require 'io/wait'
require 'digest/sha1'
module Stomp
class Connection
private
# Support multi-homed servers.
def _expand_hosts(hash)
new_hash = hash.clone
new_hash[:hosts_cloned] = hash[:hosts].clone
new_hash[:hosts] = []
#
hash[:hosts].each do |host_parms|
ai = Socket.getaddrinfo(host_parms[:host], nil, nil, Socket::SOCK_STREAM)
next if ai.nil? || ai.size == 0
info6 = ai.detect {|info| info[4] == Socket::AF_INET6}
info4 = ai.detect {|info| info[4] == Socket::AF_INET}
if info6
new_hostp = host_parms.clone
new_hostp[:host] = info6[3]
new_hash[:hosts] << new_hostp
end
if info4
new_hostp = host_parms.clone
new_hostp[:host] = info4[3]
new_hash[:hosts] << new_hostp
end
end
return new_hash
end
# Handle 1.9+ character representation.
def parse_char(char)
RUBY_VERSION > '1.9' ? char : char.chr
end
# Create parameters for any callback logger.
def log_params()
lparms = @parameters.clone if @parameters
lparms = {} unless lparms
lparms[:cur_host] = @host
lparms[:cur_port] = @port
lparms[:cur_login] = @login
lparms[:cur_passcode] = @passcode
lparms[:cur_ssl] = @ssl
lparms[:cur_recondelay] = @reconnect_delay
lparms[:cur_parseto] = @parse_timeout
lparms[:cur_conattempts] = @connection_attempts
lparms[:cur_failure] = @failure # To assist in debugging
lparms[:openstat] = open?
#
lparms
end
# _pre_connect handles low level logic just prior to a physical connect.
def _pre_connect()
@connect_headers = @connect_headers.symbolize_keys
raise Stomp::Error::ProtocolErrorConnect if (@connect_headers[:"accept-version"] && !@connect_headers[:host])
raise Stomp::Error::ProtocolErrorConnect if (!@connect_headers[:"accept-version"] && @connect_headers[:host])
return unless (@connect_headers[:"accept-version"] && @connect_headers[:host]) # 1.0
# Try 1.1 or greater
@hhas10 = false
okvers = []
avers = @connect_headers[:"accept-version"].split(",")
avers.each do |nver|
if Stomp::SUPPORTED.index(nver)
okvers << nver
@hhas10 = true if nver == Stomp::SPL_10
end
end
raise Stomp::Error::UnsupportedProtocolError if okvers == []
@connect_headers[:"accept-version"] = okvers.join(",") # This goes to server
# Heartbeats - pre connect
return unless @connect_headers[:"heart-beat"]
_validate_hbheader()
end
# _post_connect handles low level logic just after a physical connect.
def _post_connect()
return unless (@connect_headers[:"accept-version"] && @connect_headers[:host]) # 1.0
if @connection_frame.command == Stomp::CMD_ERROR
@connection_frame.headers = _decodeHeaders(@connection_frame.headers)
return
end
# We are CONNECTed
cfh = @connection_frame.headers.symbolize_keys
@protocol = cfh[:version]
if @protocol
# Should not happen, but check anyway
raise Stomp::Error::UnsupportedProtocolError unless Stomp::SUPPORTED.index(@protocol)
else # CONNECTed to a 1.0 server that does not return *any* 1.1 type headers
@protocol = Stomp::SPL_10 # reset
return
end
# Heartbeats
return unless @connect_headers[:"heart-beat"]
_init_heartbeats()
end
# socket creates and returns a new socket for use by the connection.
def socket()
@socket_semaphore.synchronize do
used_socket = @socket
used_socket = nil if closed?
while used_socket.nil? || !@failure.nil?
@failure = nil
begin
used_socket = open_socket() # sets @closed = false if OK
# Open is complete
connect(used_socket)
slog(:on_connected, log_params)
@connection_attempts = 0
rescue
@failure = $!
used_socket = nil
@closed = true
raise unless @reliable
raise if @failure.is_a?(Stomp::Error::LoggerConnectionError)
# Catch errors which are:
# a) emitted from corrupted 1.1+ 'connect' (caller programming error)
# b) should never be retried
raise if @failure.is_a?(Stomp::Error::ProtocolError11p)
begin
unless slog(:on_connectfail,log_params)
$stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n"
end
rescue Exception => aex
raise if aex.is_a?(Stomp::Error::LoggerConnectionError)
end
if max_reconnect_attempts?
$stderr.print "In socket() Reached MaxReconnectAttempts"
### _dump_threads()
mt = @parameters[:client_main]
if !mt.nil?
mt.raise Stomp::Error::MaxReconnectAttempts
Thread::exit
end
raise Stomp::Error::MaxReconnectAttempts
end
sleep(@reconnect_delay)
@connection_attempts += 1
if @parameters
change_host()
increase_reconnect_delay()
end
end
end
@socket = used_socket
end
end
# refine_params sets up defaults for a Hash initialize.
def refine_params(params)
params = params.uncamelize_and_symbolize_keys
default_params = {
:connect_headers => {},
:reliable => true,
# Failover parameters
:initial_reconnect_delay => 0.01,
:max_reconnect_delay => 30.0,
:use_exponential_back_off => true,
:back_off_multiplier => 2,
:max_reconnect_attempts => 0,
:randomize => false,
:connect_timeout => 0,
# Parse Timeout
:parse_timeout => 5,
:dmh => false,
# Closed check logic
:closed_check => true,
:hbser => false,
:stompconn => false,
:max_hbread_fails => 0,
:max_hbrlck_fails => 0,
:fast_hbs_adjust => 0.0,
:connread_timeout => 0,
:tcp_nodelay => true,
:start_timeout => 0,
:sslctx_newparm => nil,
:ssl_post_conn_check => true,
}
res_params = default_params.merge(params)
if res_params[:dmh]
res_params = _expand_hosts(res_params)
end
return res_params
end
# change_host selects the next host for retries.
def change_host
@parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize]
# Set first as master and send it to the end of array
current_host = @parameters[:hosts].shift
@parameters[:hosts] << current_host
@ssl = current_host[:ssl]
@host = current_host[:host]
@port = current_host[:port] || Connection::default_port(@ssl)
@login = current_host[:login] || ""
@passcode = current_host[:passcode] || ""
end
# Duplicate parameters hash
def _hdup(h)
ldup = {}
ldup.merge!(h)
ldup[:hosts] = []
hvals = h[:hosts].nil? ? h["hosts"] : h[:hosts]
hvals.each do |hv|
ldup[:hosts] << hv.dup
end
ldup
end
# max_reconnect_attempts? returns nil or the number of maximum reconnect
# attempts.
def max_reconnect_attempts?
!(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts]
end
# increase_reconnect_delay increases the reconnect delay for the next connection
# attempt.
def increase_reconnect_delay
@reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off]
@reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay]
@reconnect_delay
end
# __old_receive receives a frame, blocks until the frame is received.
def __old_receive()
# The receive may fail so we may need to retry.
while true
begin
used_socket = socket()
connread = false
noiosel = (@ssl || @jruby) ? true : false
return _receive(used_socket, connread, noiosel)
rescue Stomp::Error::MaxReconnectAttempts
@failure = $!
unless slog(:on_miscerr, log_params, "Reached MaxReconnectAttempts")
$stderr.print "Reached MaxReconnectAttempts\n"
end
raise
rescue
@failure = $!
raise unless @reliable
errstr = "receive failed: #{$!}"
unless slog(:on_miscerr, log_params, "es1_oldrecv: " + errstr)
$stderr.print "\non_miscerr\n"
$stderr.print log_params.inspect
$stderr.print "\n"
$stderr.print "es2_oldrecv: " + errstr
$stderr.print "\n"
end
# !!! This initiates a re-connect !!!
_reconn_prep()
end
end
end
# _reconn_prep prepares for a reconnect retry
def _reconn_prep()
close_socket()
if @parameters
change_host()
end
@st.kill if @st # Kill ticker thread if any
@rt.kill if @rt # Kill ticker thread if any
@socket = nil
end
end # class Connection
end # module Stomp
|