1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
|
require "cabin" # rubygem "cabin"
require "ftw/dns"
require "ftw/poolable"
require "ftw/namespace"
require "ftw/agent"
require "socket"
require "timeout" # ruby stdlib, just for the Timeout exception.
if RUBY_VERSION =~ /^1\.8/
# for Array#rotate, IO::WaitWritable, etc, in ruby < 1.9
require "backports"
end
require "openssl"
# A network connection. This is TCP.
#
# You can use IO::select on this objects of this type.
# (at least, in MRI you can)
#
# You can activate SSL/TLS on this connection by invoking FTW::Connection#secure
#
# This class also implements buffering itself because some IO-like classes
# (OpenSSL::SSL::SSLSocket) do not support IO#ungetbyte
class FTW::Connection
include FTW::Poolable
include Cabin::Inspectable
# A connection attempt timed out
class ConnectTimeout < StandardError; end
# A connection attempt was rejected
class ConnectRefused < StandardError; end
# A read timed out
class ReadTimeout < StandardError; end
# A write timed out
class WriteTimeout < StandardError; end
# Secure setup timed out
class SecureHandshakeTimeout < StandardError; end
# Invalid connection configuration
class InvalidConfiguration < StandardError; end
private
# A new network connection.
# The 'destination' argument can be an array of strings or a single string.
# String format is expected to be "host:port"
#
# Example:
#
# conn = FTW::Connection.new(["1.2.3.4:80", "1.2.3.5:80"])
#
# If you specify multiple destinations, they are used in a round-robin
# decision made during reconnection.
def initialize(destinations)
if destinations.is_a?(String)
@destinations = [destinations]
else
@destinations = destinations
end
@mode = :client
setup
end # def initialize
# Set up this connection.
def setup
@logger = Cabin::Channel.get
@connect_timeout = 2
# Use a fixed-size string that we set to BINARY encoding.
# Not all byte sequences are UTF-8 friendly :0
@read_size = 16384
@read_buffer = " " * @read_size
@pushback_buffer = ""
# Tell Ruby 1.9 that this string is a binary string, not utf-8 or somesuch.
if @read_buffer.respond_to?(:force_encoding)
@read_buffer.force_encoding("BINARY")
end
@inspectables = [:@destinations, :@connected, :@remote_address, :@secure]
@connected = false
@remote_address = nil
@secure = false
# TODO(sissel): Validate @destinations
# TODO(sissel): Barf if a destination is not of the form "host:port"
end # def setup
# Create a new connection from an existing IO instance (like a socket)
#
# Valid modes are :server and :client.
#
# * specify :server if this connection is from a server (via Socket#accept)
# * specify :client if this connection is from a client (via Socket#connect)
def self.from_io(io, mode=:server)
valid_modes = [:server, :client]
if !valid_modes.include?(mode)
raise InvalidArgument.new("Invalid connection mode '#{mode}'. Valid modes: #{valid_modes.inspect}")
end
connection = self.new(nil) # New connection with no destinations
connection.instance_eval do
@socket = io
@connected = true
port, address = Socket.unpack_sockaddr_in(io.getpeername)
@remote_address = "#{address}:#{port}"
@mode = mode
end
return connection
end # def self.from_io
# Connect now.
#
# Timeout value is optional. If no timeout is given, this method
# blocks until a connection is successful or an error occurs.
#
# You should check the return value of this method to determine if
# a connection was successful.
#
# Possible return values are on error include:
#
# * FTW::Connection::ConnectRefused
# * FTW::Connection::ConnectTimeout
#
# @return [nil] if the connection was successful
# @return [StandardError or subclass] if the connection failed
def connect(timeout=nil)
# TODO(sissel): Raise if we're already connected?
disconnect("reconnecting") if connected?
host, port = @destinations.first.split(":")
@destinations = @destinations.rotate # round-robin
# Do dns resolution on the host. If there are multiple
# addresses resolved, return one at random.
addresses = FTW::DNS.singleton.resolve(host)
addresses.each do |address|
# Try each address until one works.
@remote_address = address
# Addresses with colon ':' in them are assumed to be IPv6
family = @remote_address.include?(":") ? Socket::AF_INET6 : Socket::AF_INET
@logger.debug("Connecting", :address => @remote_address,
:host => host, :port => port, :family => family)
@socket = Socket.new(family, Socket::SOCK_STREAM, 0)
@socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
# This api is terrible. pack_sockaddr_in? This isn't C, man...
@logger.debug("packing", :data => [port.to_i, @remote_address])
sockaddr = Socket.pack_sockaddr_in(port.to_i, @remote_address)
# TODO(sissel): Support local address binding
# Connect with timeout
begin
@socket.connect_nonblock(sockaddr)
rescue IO::WaitWritable, Errno::EINPROGRESS
# Ruby actually raises Errno::EINPROGRESS, but for some reason
# the documentation says to use this IO::WaitWritable thing...
# I don't get it, but whatever :(
writable = writable?(timeout)
# http://jira.codehaus.org/browse/JRUBY-6528; IO.select doesn't behave
# correctly on JRuby < 1.7, so work around it.
if writable || (RUBY_PLATFORM == "java" and JRUBY_VERSION < "1.7.0")
begin
@socket.connect_nonblock(sockaddr) # check connection failure
rescue Errno::EISCONN
# Ignore, we're already connected.
rescue Errno::ECONNREFUSED => e
# Fire 'disconnected' event with reason :refused
@socket.close
return ConnectRefused.new("#{host}[#{@remote_address}]:#{port}")
rescue Errno::ETIMEDOUT
# This occurs when the system's TCP timeout hits, we have no
# control over this, as far as I can tell. *maybe* setsockopt(2)
# has a flag for this, but I haven't checked..
# TODO(sissel): We should instead do 'retry' unless we've exceeded
# the timeout.
@socket.close
return ConnectTimeout.new("#{host}[#{@remote_address}]:#{port}")
rescue Errno::EINPROGRESS
# If we get here, it's likely JRuby version < 1.7.0. EINPROGRESS at
# this point in the code means that we have timed out.
@socket.close
return ConnectTimeout.new("#{host}[#{@remote_address}]:#{port}")
end
else
# Connection timeout;
return ConnectTimeout.new("#{host}[#{@remote_address}]:#{port}")
end
# If no error at this point, we're now connected.
@connected = true
break
end # addresses.each
end
return nil
end # def connect
# Is this Connection connected?
def connected?
return @connected
end # def connected?
# Write data to this connection.
# This method blocks until the write succeeds unless a timeout is given.
#
# This method is not guaranteed to have written the full data given.
#
# Returns the number of bytes written (See also IO#syswrite)
def write(data, timeout=nil)
#connect if !connected?
if writable?(timeout)
return @socket.syswrite(data)
else
raise FTW::Connection::WriteTimeout.new(self.inspect)
end
end # def write
# Read data from this connection
# This method blocks until the read succeeds unless a timeout is given.
#
# This method is not guaranteed to read exactly 'length' bytes. See
# IO#sysread
def read(length=16384, timeout=nil)
data = ""
data.force_encoding("BINARY") if data.respond_to?(:force_encoding)
have_pushback = !@pushback_buffer.empty?
if have_pushback
data << @pushback_buffer
@pushback_buffer = ""
# We have data 'now' so don't wait.
timeout = 0
end
if readable?(timeout)
begin
# Read at most 'length' data, so read less from the socket
# We'll read less than 'length' if the pushback buffer has
# data in it already.
@socket.sysread(length - data.length, @read_buffer)
data << @read_buffer
return data
rescue EOFError => e
@socket.close
@connected = false
raise e
end
else
if have_pushback
return data
else
raise ReadTimeout.new
end
end
end # def read
# Push back some data onto the connection's read buffer.
def pushback(data)
@pushback_buffer << data
end # def pushback
# End this connection, specifying why.
def disconnect(reason)
io = @socket
if @socket.is_a?(OpenSSL::SSL::SSLSocket)
@socket.sysclose()
io = @socket.io
end
begin
io.close_read
rescue IOError => e
# Ignore, perhaps we shouldn't ignore.
end
begin
io.close_write
rescue IOError => e
# Ignore, perhaps we shouldn't ignore.
end
end # def disconnect
# Is this connection writable? Returns true if it is writable within
# the timeout period. False otherwise.
#
# The time out is in seconds. Fractional seconds are OK.
def writable?(timeout)
readable, writable, errors = IO.select(nil, [@socket], nil, timeout)
return !writable.nil?
end # def writable?
# Is this connection readable? Returns true if it is readable within
# the timeout period. False otherwise.
#
# The time out is in seconds. Fractional seconds are OK.
def readable?(timeout)
readable, writable, errors = IO.select([@socket], nil, nil, timeout)
return !readable.nil?
end # def readable?
# The host:port
def peer
return @remote_address
end # def peer
# Support 'to_io' so you can use IO::select on this object.
def to_io
return @socket
end # def to_io
# Secure this connection with TLS.
#
# Options:
#
# * :certificate_store, an OpenSSL::X509::Store
# * :timeout, a timeout threshold in seconds.
# * :ciphers, an OpenSSL ciphers string, see `openssl ciphers` manual for details.
# * :ssl_version, any of: SSLv2, SSLv3, TLSv1, TLSv1.1, TLSv1.2
# * :certificate, an OpenSSL::X509::Certificate
# * :key, an OpenSSL::PKey (like OpenSSL::PKey::RSA)
#
# Both `certificate` and `key` are highly recommended if the connection
# belongs to a server (not a client connection).
#
# Notes:
# * Version may depend on your platform (openssl compilation settings, JVM
# version, export restrictions, etc)
# * Available ciphers will depend on your version of Ruby (or JRuby and JVM),
# OpenSSL, etc.
def secure(options=nil)
# Skip this if we're already secure.
return if secured?
defaults = {
:timeout => nil,
:ciphers => FTW::Agent::Configuration::SSL_CIPHER_MAP["MOZILLA_MODERN"],
:ssl_version => "TLSv1.1"
}
settings = defaults.merge(options) unless options.nil?
@logger.info("Securing this connection", :peer => peer, :options => settings)
# Wrap this connection with TLS/SSL
sslcontext = OpenSSL::SSL::SSLContext.new
# If you use VERIFY_NONE, you are removing the trust feature of TLS. Don't do that.
# Encryption without trust means you don't know who you are talking to.
sslcontext.verify_mode = OpenSSL::SSL::VERIFY_PEER
# ruby-core is refusing to patch ruby's default openssl settings to be more
# secure, so let's fix that here. The next few lines setting options and
# ciphers come from jmhodges' proposed patch
ssloptions = OpenSSL::SSL::OP_ALL
if defined?(OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS)
ssloptions &= ~OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS
end
if defined?(OpenSSL::SSL::OP_NO_COMPRESSION)
ssloptions |= OpenSSL::SSL::OP_NO_COMPRESSION
end
# https://github.com/jruby/jruby/issues/1874
version = OpenSSL::SSL::SSLContext::METHODS.find { |x| x.to_s.gsub("_",".") == settings[:ssl_version] }
raise InvalidConfiguration, "Invalid SSL/TLS version '#{settings[:ssl_version]}'" if version.nil?
sslcontext.ssl_version = version
# We have to set ciphers *after* setting ssl_version because setting
# ssl_version will reset the cipher set.
sslcontext.options = ssloptions
sslcontext.ciphers = settings[:ciphers]
sslcontext.verify_callback = proc do |*args|
@logger.debug("Verify peer via FTW::Connection#secure", :callback => settings[:verify_callback])
if settings[:verify_callback].respond_to?(:call)
settings[:verify_callback].call(*args)
end
end
sslcontext.cert_store = settings[:certificate_store]
if settings.include?(:certificate) && settings.include?(:key)
sslcontext.cert = settings[:certificate]
sslcontext.key = settings[:key]
end
@socket = OpenSSL::SSL::SSLSocket.new(@socket, sslcontext)
# TODO(sissel): Set up local certificat/key stuff. This is required for
# server-side ssl operation, I think.
if client?
do_secure(:connect_nonblock, settings[:timeout])
else
do_secure(:accept_nonblock, settings[:timeout])
end
end # def secure
# Secure this connection.
#
# The handshake method for OpenSSL::SSL::SSLSocket is different depending
# on the mode (client or server).
#
# @param [Symbol] handshake_method The method to call on the socket to
# complete the ssl handshake. See OpenSSL::SSL::SSLSocket#connect_nonblock
# of #accept_nonblock for more details
def do_secure(handshake_method, timeout=nil)
# SSLSocket#connect_nonblock will do the SSL/TLS handshake.
# TODO(sissel): refactor this into a method that both secure and connect
# methods can call.
start = Time.now
begin
@socket.send(handshake_method)
rescue IO::WaitReadable, IO::WaitWritable
# The ruby OpenSSL docs for 1.9.3 have example code saying I should use
# IO::WaitReadable, but in the real world it raises an SSLError with
# a specific string message instead of Errno::EAGAIN or IO::WaitReadable
# explicitly...
#
# This SSLSocket#connect_nonblock raising WaitReadable (Technically,
# OpenSSL::SSL::SSLError) is in contrast to what Socket#connect_nonblock
# raises, WaitWritable (ok, Errno::EINPROGRESS, technically)
# Ruby's SSL exception for 'this call would block' is pretty shitty.
#
# So we rescue both IO::Wait{Readable,Writable} and keep trying
# until timeout occurs.
#
if !timeout.nil?
time_left = timeout - (Time.now - start)
raise SecureHandshakeTimeout.new if time_left < 0
r, w, e = IO.select([@socket], [@socket], nil, time_left)
else
r, w, e = IO.select([@socket], [@socket], nil, timeout)
end
# keep going if the socket is ready
retry if r.size > 0 || w.size > 0
rescue => e
@logger.warn(e)
raise e
end
@secure = true
end # def do_secure
# Has this connection been secured?
def secured?
return @secure
end # def secured?
# Is this a client connection?
def client?
return @mode == :client
end # def client?
# Is this a server connection?
def server?
return @mode == :server
end # def server?
public(:connect, :connected?, :write, :read, :pushback, :disconnect,
:writable?, :readable?, :peer, :to_io, :secure, :secured?,
:client?, :server?)
end # class FTW::Connection
|