1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
|
require 'socket'
require 'fcntl'
module Riemann
class Client
# Socket: A specialized socket that has been configure
class TcpSocket
class Error < Riemann::Client::Error; end
class Timeout < Error; end
# Internal:
# The timeout for reading in seconds. Defaults to 2
attr_accessor :read_timeout
# Internal:
# The timeout for connecting in seconds. Defaults to 2
attr_reader :connect_timeout
# Internal:
# The timeout for writing in seconds. Defaults to 2
attr_reader :write_timeout
# Internal:
# The host this socket is connected to
attr_reader :host
# Internal:
# The port this socket is connected to
attr_reader :port
# Internal
#
# Used for setting TCP_KEEPIDLE: overrides tcp_keepalive_time for a single
# socket.
#
# http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
#
# tcp_keepalive_time:
#
# The interval between the last data packet sent (simple ACKs are not
# considered data) and the first keepalive probe; after the connection is
# marked to need keepalive, this counter is not used any further.
attr_reader :keepalive_idle
# Internal
#
# Used for setting TCP_KEEPINTVL: overrides tcp_keepalive_intvl for a single
# socket.
#
# http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
#
# tcp_keepalive_intvl:
#
# The interval between subsequential keepalive probes, regardless of what
# the connection has exchanged in the meantime.
attr_reader :keepalive_interval
# Internal
#
# Used for setting TCP_KEEPCNT: overrides tcp_keepalive_probes for a single
# socket.
#
# http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
#
# tcp_keepalive_probes:
#
# The number of unacknowledged probes to send before considering the
# connection dead and notifying the application layer.
attr_reader :keepalive_count
# Internal: Create and connect to the given location.
#
# options, same as Constructor
#
# Returns an instance of KJess::Socket
def self.connect(options = {})
s = new(options)
s.connect
return s
end
# Internal: Creates a new KJess::Socket
def initialize( options = {} )
@host = options[:host]
@port = options[:port]
@connect_timeout = options[:connect_timeout] || options[:timeout] || 2
@read_timeout = options[:read_timeout] || options[:timeout] || 2
@write_timeout = options[:write_timeout] || options[:timeout] || 2
@keepalive_active = options.fetch(:keepalive_active, true)
@keepalive_idle = options[:keepalive_idle] || 60
@keepalive_interval = options[:keepalive_interval] || 30
@keepalive_count = options[:keepalive_count] || 5
@socket = nil
end
# Internal: Return whether or not the keepalive_active flag is set.
def keepalive_active?
@keepalive_active
end
# Internal: Low level socket allocation and option configuration
#
# Using the options from the initializer, a new ::Socket is created that
# is:
#
# TCP, IPv4 only, autoclosing on exit, nagle's algorithm is disabled and has
# TCP Keepalive options set if keepalive is supported.
#
# Returns a new ::Socket instance
def blank_socket
sock = ::Socket.new(::Socket::AF_INET, ::Socket::SOCK_STREAM, 0)
# close file descriptors if we exec
sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
# Disable Nagle's algorithm
sock.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)
if using_keepalive? then
sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE , true)
sock.setsockopt(::Socket::SOL_TCP, ::Socket::TCP_KEEPIDLE , keepalive_idle)
sock.setsockopt(::Socket::SOL_TCP, ::Socket::TCP_KEEPINTVL, keepalive_interval)
sock.setsockopt(::Socket::SOL_TCP, ::Socket::TCP_KEEPCNT , keepalive_count)
end
return sock
end
# Internal: Return the connected raw Socket.
#
# If the socket is closed or non-existent it will create and connect again.
#
# Returns a ::Socket
def socket
return @socket unless closed?
@socket ||= connect()
end
# Internal: Closes the internal ::Socket
#
# Returns nothing
def close
@socket.close unless closed?
@socket = nil
end
# Internal: Return true the socket is closed.
def closed?
return true if @socket.nil?
return true if @socket.closed?
return false
end
# Internal:
#
# Connect to the remote host in a non-blocking fashion.
#
# Raise Error if there is a failure connecting.
#
# Return the ::Socket on success
def connect
# Calculate our timeout deadline
deadline = Time.now.to_f + connect_timeout
# Lookup destination address, we only want IPv4 , TCP
addrs = ::Socket.getaddrinfo(host, port, ::Socket::AF_INET, ::Socket::SOCK_STREAM )
errors = []
conn_error = lambda { raise errors.first }
sock = nil
addrs.find( conn_error ) do |addr|
sock = connect_or_error( addr, deadline, errors )
end
return sock
end
# Internal: Connect to the destination or raise an error.
#
# Connect to the address or capture the error of the connection
#
# addr - An address returned from Socket.getaddrinfo()
# deadline - the after which we should raise a timeout error
# errors - a collection of errors to append an error too should we have one.
#
# Make an attempt to connect to the given address. If it is successful,
# return the socket.
#
# Should the connection fail, append the exception to the errors array and
# return false.
#
def connect_or_error( addr, deadline, errors )
timeout = deadline - Time.now.to_f
raise Timeout, "Could not connect to #{host}:#{port}" if timeout <= 0
return connect_nonblock( addr, timeout )
rescue Error => e
errors << e
return false
end
# Internal: Connect to the give address within the timeout.
#
# Make an attempt to connect to a single address within the given timeout.
#
# Return the ::Socket when it is connected, or raise an Error if no
# connection was possible.
def connect_nonblock( addr, timeout )
sockaddr = ::Socket.pack_sockaddr_in(addr[1], addr[3])
sock = blank_socket()
sock.connect_nonblock( sockaddr )
return sock
rescue Errno::EINPROGRESS
if IO.select(nil, [sock], nil, timeout).nil?
sock.close rescue nil
raise Timeout, "Could not connect to #{host}:#{port} within #{timeout} seconds"
end
return connect_nonblock_finalize( sock, sockaddr )
rescue => ex
sock.close rescue nil
raise Error, "Could not connect to #{host}:#{port}: #{ex.class}: #{ex.message}", ex.backtrace
end
# Internal: Make sure that a non-blocking connect has truely connected.
#
# Ensure that the given socket is actually connected to the given adddress.
#
# Returning the socket if it is and raising an Error if it isn't.
def connect_nonblock_finalize( sock, sockaddr )
sock.connect_nonblock( sockaddr )
return sock
rescue Errno::EISCONN
return sock
rescue => ex
sock.close rescue nil
raise Error, "Could not connect to #{host}:#{port}: #{ex.class}: #{ex.message}", ex.backtrace
end
# Internal: say if we are using TCP Keep Alive or not
#
# We will return true if the initialization options :keepalive_active is
# set to true, and if all the constants that are necessary to use TCP keep
# alive are defined.
#
# It may be the case that on some operating systems that the constants are
# not defined, so in that case we do not want to attempt to use tcp keep
# alive if we are unable to do so in any case.
#
# Returns true or false
def using_keepalive?
using = false
if keepalive_active? then
using = [ :SOL_SOCKET, :SO_KEEPALIVE, :SOL_TCP, :TCP_KEEPIDLE, :TCP_KEEPINTVL, :TCP_KEEPCNT].all? do |c|
::Socket.const_defined? c
end
end
return using
end
# Reads length bytes from the socket
#
# length - the number of bytes to read from the socket
# outbuf - an optional buffer to store the bytes in
#
# Returns the bytes read if no outbuf is specified
def read(length, outbuf = nil)
if outbuf
outbuf.replace('')
buf = outbuf
else
buf = ''
end
while buf.length < length
unless rb = readpartial(length - buf.length)
break
end
buf << rb
end
return buf
end
# Internal: Read up to a maxlen of data from the socket and store it in outbuf
#
# maxlen - the maximum number of bytes to read from the socket
# outbuf - the buffer in which to store the bytes.
#
# Returns the bytes read
def readpartial(maxlen, outbuf = nil)
return socket.read_nonblock(maxlen, outbuf)
rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::ECONNRESET
if wait_readable(read_timeout)
retry
else
raise Timeout, "Could not read from #{host}:#{port} in #{read_timeout} seconds"
end
end
# Internal: Write the given data to the socket
#
# buf - the data to write to the socket.
#
# Raises an error if it is unable to write the data to the socket within the
# write_timeout.
#
# returns nothing
def write(buf)
until buf.nil? or (buf.length == 0) do
written = socket.write_nonblock(buf)
buf = buf[written, buf.length]
end
rescue Errno::EWOULDBLOCK, Errno::EINTR, Errno::EAGAIN, Errno::ECONNRESET
if wait_writable(write_timeout)
retry
else
raise Timeout, "Could not write to #{host}:#{port} in #{write_timeout} seconds"
end
end
def wait_writable(timeout = nil)
IO.select(nil, [@socket], nil, timeout || write_timeout)
end
def wait_readable(timeout = nil)
IO.select([@socket], nil, nil, timeout || read_timeout)
end
end
end
end
|