File: connection.rb

package info (click to toggle)
ruby-ftw 0.0.49-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 548 kB
  • sloc: ruby: 1,922; makefile: 5
file content (466 lines) | stat: -rw-r--r-- 15,657 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
require "cabin" # rubygem "cabin"
require "ftw/dns"
require "ftw/poolable"
require "ftw/namespace"
require "ftw/agent"
require "socket"
require "timeout" # ruby stdlib, just for the Timeout exception.

if RUBY_VERSION =~ /^1\.8/
  # for Array#rotate, IO::WaitWritable, etc, in ruby < 1.9
  require "backports"
end

require "openssl"

# A network connection. This is TCP.
#
# You can use IO::select on this objects of this type.
# (at least, in MRI you can)
#
# You can activate SSL/TLS on this connection by invoking FTW::Connection#secure
#
# This class also implements buffering itself because some IO-like classes
# (OpenSSL::SSL::SSLSocket) do not support IO#ungetbyte
class FTW::Connection
  include FTW::Poolable
  include Cabin::Inspectable

  # A connection attempt timed out
  class ConnectTimeout < StandardError; end
  
  # A connection attempt was rejected
  class ConnectRefused < StandardError; end

  # A read timed out
  class ReadTimeout < StandardError; end

  # A write timed out
  class WriteTimeout < StandardError; end

  # Secure setup timed out
  class SecureHandshakeTimeout < StandardError; end

  # Invalid connection configuration
  class InvalidConfiguration < StandardError; end

  private

  # A new network connection.
  # The 'destination' argument can be an array of strings or a single string.
  # String format is expected to be "host:port"
  #
  # Example:
  #
  #     conn = FTW::Connection.new(["1.2.3.4:80", "1.2.3.5:80"])
  #
  # If you specify multiple destinations, they are used in a round-robin
  # decision made during reconnection.
  def initialize(destinations)
    if destinations.is_a?(String)
      @destinations = [destinations]
    else
      @destinations = destinations
    end

    @mode = :client
    setup
  end # def initialize

  # Set up this connection.
  def setup
    @logger = Cabin::Channel.get
    @connect_timeout = 2

    # Use a fixed-size string that we set to BINARY encoding.
    # Not all byte sequences are UTF-8 friendly :0
    @read_size = 16384
    @read_buffer = " " * @read_size
    @pushback_buffer = ""

    # Tell Ruby 1.9 that this string is a binary string, not utf-8 or somesuch.
    if @read_buffer.respond_to?(:force_encoding)
      @read_buffer.force_encoding("BINARY")
    end

    @inspectables = [:@destinations, :@connected, :@remote_address, :@secure]
    @connected = false
    @remote_address = nil
    @secure = false

    # TODO(sissel): Validate @destinations
    # TODO(sissel): Barf if a destination is not of the form "host:port"
  end # def setup

  # Create a new connection from an existing IO instance (like a socket)
  # 
  # Valid modes are :server and :client.
  #
  # * specify :server if this connection is from a server (via Socket#accept)
  # * specify :client if this connection is from a client (via Socket#connect)
  def self.from_io(io, mode=:server)
    valid_modes = [:server, :client]
    if !valid_modes.include?(mode)
      raise InvalidArgument.new("Invalid connection mode '#{mode}'. Valid modes: #{valid_modes.inspect}")
    end

    connection = self.new(nil) # New connection with no destinations
    connection.instance_eval do
      @socket = io
      @connected = true
      port, address = Socket.unpack_sockaddr_in(io.getpeername)
      @remote_address = "#{address}:#{port}"
      @mode = mode
    end
    return connection
  end # def self.from_io

  # Connect now.
  #
  # Timeout value is optional. If no timeout is given, this method
  # blocks until a connection is successful or an error occurs.
  #
  # You should check the return value of this method to determine if
  # a connection was successful.
  #
  # Possible return values are on error include:
  #
  # * FTW::Connection::ConnectRefused
  # * FTW::Connection::ConnectTimeout
  #
  # @return [nil] if the connection was successful
  # @return [StandardError or subclass] if the connection failed
  def connect(timeout=nil)
    # TODO(sissel): Raise if we're already connected?
    disconnect("reconnecting") if connected?
    host, port = @destinations.first.split(":")
    @destinations = @destinations.rotate # round-robin

    # Do dns resolution on the host. If there are multiple
    # addresses resolved, return one at random.
    addresses = FTW::DNS.singleton.resolve(host)

    addresses.each do |address|
      # Try each address until one works.
      @remote_address = address
      # Addresses with colon ':' in them are assumed to be IPv6
      family = @remote_address.include?(":") ? Socket::AF_INET6 : Socket::AF_INET
      @logger.debug("Connecting", :address => @remote_address,
                    :host => host, :port => port, :family => family)
      @socket = Socket.new(family, Socket::SOCK_STREAM, 0)
      @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

      # This api is terrible. pack_sockaddr_in? This isn't C, man...
      @logger.debug("packing", :data => [port.to_i, @remote_address])
      sockaddr = Socket.pack_sockaddr_in(port.to_i, @remote_address)
      # TODO(sissel): Support local address binding

      # Connect with timeout
      begin
        @socket.connect_nonblock(sockaddr)
      rescue IO::WaitWritable, Errno::EINPROGRESS
        # Ruby actually raises Errno::EINPROGRESS, but for some reason
        # the documentation says to use this IO::WaitWritable thing...
        # I don't get it, but whatever :(

        writable = writable?(timeout)

        # http://jira.codehaus.org/browse/JRUBY-6528; IO.select doesn't behave
        # correctly on JRuby < 1.7, so work around it.
        if writable || (RUBY_PLATFORM == "java" and JRUBY_VERSION < "1.7.0")
          begin
            @socket.connect_nonblock(sockaddr) # check connection failure
          rescue Errno::EISCONN 
            # Ignore, we're already connected.
          rescue Errno::ECONNREFUSED => e
            # Fire 'disconnected' event with reason :refused
            @socket.close
            return ConnectRefused.new("#{host}[#{@remote_address}]:#{port}")
          rescue Errno::ETIMEDOUT
            # This occurs when the system's TCP timeout hits, we have no
            # control over this, as far as I can tell. *maybe* setsockopt(2)
            # has a flag for this, but I haven't checked..
            # TODO(sissel): We should instead do 'retry' unless we've exceeded
            # the timeout.
            @socket.close
            return ConnectTimeout.new("#{host}[#{@remote_address}]:#{port}")
          rescue Errno::EINPROGRESS
            # If we get here, it's likely JRuby version < 1.7.0. EINPROGRESS at
            # this point in the code means that we have timed out.
            @socket.close
            return ConnectTimeout.new("#{host}[#{@remote_address}]:#{port}")
          end
        else
          # Connection timeout;
          return ConnectTimeout.new("#{host}[#{@remote_address}]:#{port}")
        end

        # If no error at this point, we're now connected.
        @connected = true
        break
      end # addresses.each
    end 
    return nil
  end # def connect

  # Is this Connection connected?
  def connected?
    return @connected
  end # def connected?

  # Write data to this connection.
  # This method blocks until the write succeeds unless a timeout is given.
  #
  # This method is not guaranteed to have written the full data given.
  #
  # Returns the number of bytes written (See also IO#syswrite)
  def write(data, timeout=nil)
    #connect if !connected?
    if writable?(timeout)
      return @socket.syswrite(data)
    else
      raise FTW::Connection::WriteTimeout.new(self.inspect)
    end
  end # def write

  # Read data from this connection
  # This method blocks until the read succeeds unless a timeout is given.
  #
  # This method is not guaranteed to read exactly 'length' bytes. See
  # IO#sysread
  def read(length=16384, timeout=nil)
    data = ""
    data.force_encoding("BINARY") if data.respond_to?(:force_encoding)
    have_pushback = !@pushback_buffer.empty?
    if have_pushback
      data << @pushback_buffer
      @pushback_buffer = ""
      # We have data 'now' so don't wait.
      timeout = 0
    end

    if readable?(timeout)
      begin
        # Read at most 'length' data, so read less from the socket
        # We'll read less than 'length' if the pushback buffer has
        # data in it already.
        @socket.sysread(length - data.length, @read_buffer)
        data << @read_buffer
        return data
      rescue EOFError => e
        @socket.close
        @connected = false
        raise e
      end
    else
      if have_pushback
        return data
      else
        raise ReadTimeout.new
      end
    end
  end # def read

  # Push back some data onto the connection's read buffer.
  def pushback(data)
    @pushback_buffer << data
  end # def pushback

  # End this connection, specifying why.
  def disconnect(reason)
    io = @socket
    if @socket.is_a?(OpenSSL::SSL::SSLSocket)
      @socket.sysclose()
      io = @socket.io
    end
    begin 
      io.close_read
    rescue IOError => e
      # Ignore, perhaps we shouldn't ignore.
    end

    begin 
      io.close_write
    rescue IOError => e
      # Ignore, perhaps we shouldn't ignore.
    end
  end # def disconnect

  # Is this connection writable? Returns true if it is writable within
  # the timeout period. False otherwise.
  #
  # The time out is in seconds. Fractional seconds are OK.
  def writable?(timeout)
    readable, writable, errors = IO.select(nil, [@socket], nil, timeout)
    return !writable.nil?
  end # def writable?

  # Is this connection readable? Returns true if it is readable within
  # the timeout period. False otherwise.
  #
  # The time out is in seconds. Fractional seconds are OK.
  def readable?(timeout)
    readable, writable, errors = IO.select([@socket], nil, nil, timeout)
    return !readable.nil?
  end # def readable?

  # The host:port
  def peer
    return @remote_address
  end # def peer

  # Support 'to_io' so you can use IO::select on this object.
  def to_io
    return @socket
  end # def to_io

  # Secure this connection with TLS.
  #
  # Options:
  #
  # * :certificate_store, an OpenSSL::X509::Store
  # * :timeout, a timeout threshold in seconds.
  # * :ciphers, an OpenSSL ciphers string, see `openssl ciphers` manual for details.
  # * :ssl_version, any of: SSLv2, SSLv3, TLSv1, TLSv1.1, TLSv1.2
  # * :certificate, an OpenSSL::X509::Certificate
  # * :key, an OpenSSL::PKey (like OpenSSL::PKey::RSA)
  #
  # Both `certificate` and `key` are highly recommended if the connection
  # belongs to a server (not a client connection).
  #
  # Notes:
  # * Version may depend on your platform (openssl compilation settings, JVM
  #   version, export restrictions, etc) 
#   * Available ciphers will depend on your version of Ruby (or JRuby and JVM),
  #   OpenSSL, etc.
  def secure(options=nil)
    # Skip this if we're already secure.
    return if secured?

    defaults = {
      :timeout => nil,
      :ciphers => FTW::Agent::Configuration::SSL_CIPHER_MAP["MOZILLA_MODERN"],
      :ssl_version => "TLSv1.1"
    }
    settings = defaults.merge(options) unless options.nil?

    @logger.info("Securing this connection", :peer => peer, :options => settings)
    # Wrap this connection with TLS/SSL
    sslcontext = OpenSSL::SSL::SSLContext.new
    # If you use VERIFY_NONE, you are removing the trust feature of TLS. Don't do that.
    # Encryption without trust means you don't know who you are talking to.
    sslcontext.verify_mode = OpenSSL::SSL::VERIFY_PEER

    # ruby-core is refusing to patch ruby's default openssl settings to be more
    # secure, so let's fix that here. The next few lines setting options and
    # ciphers come from jmhodges' proposed patch
    ssloptions = OpenSSL::SSL::OP_ALL
    if defined?(OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS)
      ssloptions &= ~OpenSSL::SSL::OP_DONT_INSERT_EMPTY_FRAGMENTS
    end
    if defined?(OpenSSL::SSL::OP_NO_COMPRESSION)
      ssloptions |= OpenSSL::SSL::OP_NO_COMPRESSION
    end
    # https://github.com/jruby/jruby/issues/1874
    version = OpenSSL::SSL::SSLContext::METHODS.find { |x| x.to_s.gsub("_",".") == settings[:ssl_version] }
    raise InvalidConfiguration, "Invalid SSL/TLS version '#{settings[:ssl_version]}'" if version.nil?
    sslcontext.ssl_version = version

    # We have to set ciphers *after* setting ssl_version because setting
    # ssl_version will reset the cipher set.
    sslcontext.options = ssloptions
    sslcontext.ciphers = settings[:ciphers]

    sslcontext.verify_callback = proc do |*args| 
      @logger.debug("Verify peer via FTW::Connection#secure", :callback => settings[:verify_callback])
      if settings[:verify_callback].respond_to?(:call)
        settings[:verify_callback].call(*args)
      end
    end
    sslcontext.cert_store = settings[:certificate_store]

    if settings.include?(:certificate) && settings.include?(:key)
      sslcontext.cert = settings[:certificate]
      sslcontext.key = settings[:key]
    end

    @socket = OpenSSL::SSL::SSLSocket.new(@socket, sslcontext)

    # TODO(sissel): Set up local certificat/key stuff. This is required for
    # server-side ssl operation, I think.

    if client?
      do_secure(:connect_nonblock, settings[:timeout])
    else
      do_secure(:accept_nonblock, settings[:timeout])
    end
  end # def secure

  # Secure this connection.
  #
  # The handshake method for OpenSSL::SSL::SSLSocket is different depending
  # on the mode (client or server).
  #
  # @param [Symbol] handshake_method The method to call on the socket to
  #   complete the ssl handshake. See OpenSSL::SSL::SSLSocket#connect_nonblock
  #   of #accept_nonblock for more details
  def do_secure(handshake_method, timeout=nil)
    # SSLSocket#connect_nonblock will do the SSL/TLS handshake.
    # TODO(sissel): refactor this into a method that both secure and connect
    # methods can call.
    start = Time.now
    begin
      @socket.send(handshake_method)
    rescue IO::WaitReadable, IO::WaitWritable
      # The ruby OpenSSL docs for 1.9.3 have example code saying I should use
      # IO::WaitReadable, but in the real world it raises an SSLError with
      # a specific string message instead of Errno::EAGAIN or IO::WaitReadable
      # explicitly...
      #
      # This SSLSocket#connect_nonblock raising WaitReadable (Technically,
      # OpenSSL::SSL::SSLError) is in contrast to what Socket#connect_nonblock
      # raises, WaitWritable (ok, Errno::EINPROGRESS, technically)
      # Ruby's SSL exception for 'this call would block' is pretty shitty.
      #
      # So we rescue both IO::Wait{Readable,Writable} and keep trying
      # until timeout occurs.
      #
      
      if !timeout.nil?
        time_left = timeout - (Time.now - start)
        raise SecureHandshakeTimeout.new if time_left < 0
        r, w, e = IO.select([@socket], [@socket], nil, time_left)
      else
        r, w, e = IO.select([@socket], [@socket], nil, timeout)
      end

      # keep going if the socket is ready
      retry if r.size > 0 || w.size > 0
    rescue => e
      @logger.warn(e)
      raise e
    end

    @secure = true
  end # def do_secure

  # Has this connection been secured?
  def secured?
    return @secure
  end # def secured?

  # Is this a client connection?
  def client?
    return @mode == :client
  end # def client?

  # Is this a server connection?
  def server?
    return @mode == :server
  end # def server?

  public(:connect, :connected?, :write, :read, :pushback, :disconnect,
         :writable?, :readable?, :peer, :to_io, :secure, :secured?,
         :client?, :server?)
end # class FTW::Connection