File: tcp_socket.rb

package info (click to toggle)
ruby-riemann-client 0.2.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 148 kB
  • sloc: ruby: 730; makefile: 2
file content (334 lines) | stat: -rw-r--r-- 10,938 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
require 'socket'
require 'fcntl'

module Riemann
  class Client
  # Socket: A specialized socket that has been configure
    class TcpSocket
      class Error < Riemann::Client::Error; end
      class Timeout < Error; end

      # Internal:
      # The timeout for reading in seconds. Defaults to 2
      attr_accessor :read_timeout

      # Internal:
      # The timeout for connecting in seconds. Defaults to 2
      attr_reader :connect_timeout

      # Internal:
      # The timeout for writing in seconds. Defaults to 2
      attr_reader :write_timeout

      # Internal:
      # The host this socket is connected to
      attr_reader :host

      # Internal:
      # The port this socket is connected to
      attr_reader :port

      # Internal
      #
      # Used for setting TCP_KEEPIDLE: overrides tcp_keepalive_time for a single
      # socket.
      #
      # http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
      #
      # tcp_keepalive_time:
      #
      #  The interval between the last data packet sent (simple ACKs are not
      #  considered data) and the first keepalive probe; after the connection is
      #  marked to need keepalive, this counter is not used any further.
      attr_reader :keepalive_idle

      # Internal
      #
      # Used for setting TCP_KEEPINTVL: overrides tcp_keepalive_intvl for a single
      # socket.
      #
      # http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
      #
      # tcp_keepalive_intvl:
      #
      #   The interval between subsequential keepalive probes, regardless of what
      #   the connection has exchanged in the meantime.
      attr_reader :keepalive_interval

      # Internal
      #
      # Used for setting TCP_KEEPCNT: overrides tcp_keepalive_probes for a single
      # socket.
      #
      # http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
      #
      # tcp_keepalive_probes:
      #
      #   The number of unacknowledged probes to send before considering the
      #   connection dead and notifying the application layer.
      attr_reader :keepalive_count


      # Internal: Create and connect to the given location.
      #
      # options, same as Constructor
      #
      # Returns an instance of KJess::Socket
      def self.connect(options = {})
        s = new(options)
        s.connect
        return s
      end

      # Internal: Creates a new KJess::Socket
      def initialize( options = {} )
        @host = options[:host]
        @port = options[:port]

        @connect_timeout = options[:connect_timeout] || options[:timeout] || 2
        @read_timeout    = options[:read_timeout]    || options[:timeout] || 2
        @write_timeout   = options[:write_timeout]   || options[:timeout] || 2

        @keepalive_active   = options.fetch(:keepalive_active, true)
        @keepalive_idle     = options[:keepalive_idle]     || 60
        @keepalive_interval = options[:keepalive_interval] || 30
        @keepalive_count    = options[:keepalive_count]    || 5

        @socket             = nil
      end

      # Internal: Return whether or not the keepalive_active flag is set.
      def keepalive_active?
        @keepalive_active
      end

      # Internal: Low level socket allocation and option configuration
      #
      # Using the options from the initializer, a new ::Socket is created that
      # is:
      #
      #   TCP, IPv4 only, autoclosing on exit, nagle's algorithm is disabled and has
      #   TCP Keepalive options set if keepalive is supported.
      #
      # Returns a new ::Socket instance
      def blank_socket
        sock = ::Socket.new(::Socket::AF_INET, ::Socket::SOCK_STREAM, 0)

        # close file descriptors if we exec
        sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)

        # Disable Nagle's algorithm
        sock.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)

        if using_keepalive? then
          sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE , true)
          sock.setsockopt(::Socket::SOL_TCP,    ::Socket::TCP_KEEPIDLE , keepalive_idle)
          sock.setsockopt(::Socket::SOL_TCP,    ::Socket::TCP_KEEPINTVL, keepalive_interval)
          sock.setsockopt(::Socket::SOL_TCP,    ::Socket::TCP_KEEPCNT  , keepalive_count)
        end

        return sock
      end

      # Internal: Return the connected raw Socket.
      #
      # If the socket is closed or non-existent it will create and connect again.
      #
      # Returns a ::Socket
      def socket
        return @socket unless closed?
        @socket ||= connect()
      end

      # Internal: Closes the internal ::Socket
      #
      # Returns nothing
      def close
        @socket.close unless closed?
        @socket = nil
      end

      # Internal: Return true the socket is closed.
      def closed?
        return true if @socket.nil?
        return true if @socket.closed?
        return false
      end

      # Internal:
      #
      # Connect to the remote host in a non-blocking fashion.
      #
      # Raise Error if there is a failure connecting.
      #
      # Return the ::Socket on success
      def connect
        # Calculate our timeout deadline
        deadline = Time.now.to_f + connect_timeout

        # Lookup destination address, we only want    IPv4             , TCP
        addrs      = ::Socket.getaddrinfo(host, port, ::Socket::AF_INET, ::Socket::SOCK_STREAM )
        errors     = []
        conn_error = lambda { raise errors.first }
        sock       = nil

        addrs.find( conn_error ) do |addr|
          sock = connect_or_error( addr, deadline, errors )
        end
        return sock
      end

      # Internal: Connect to the destination or raise an error.
      #
      # Connect to the address or capture the error of the connection
      #
      # addr     - An address returned from Socket.getaddrinfo()
      # deadline - the after which we should raise a timeout error
      # errors   - a collection of errors to append an error too should we have one.
      #
      # Make an attempt to connect to the given address. If it is successful,
      # return the socket.
      #
      # Should the connection fail, append the exception to the errors array and
      # return false.
      #
      def connect_or_error( addr, deadline, errors )
        timeout = deadline - Time.now.to_f
        raise Timeout, "Could not connect to #{host}:#{port}" if timeout <= 0
        return connect_nonblock( addr, timeout )
      rescue Error => e
        errors << e
        return false
      end

      # Internal: Connect to the give address within the timeout.
      #
      # Make an attempt to connect to a single address within the given timeout.
      #
      # Return the ::Socket when it is connected, or raise an Error if no
      # connection was possible.
      def connect_nonblock( addr, timeout )
        sockaddr = ::Socket.pack_sockaddr_in(addr[1], addr[3])
        sock     = blank_socket()
        sock.connect_nonblock( sockaddr )
        return sock
      rescue Errno::EINPROGRESS
        if IO.select(nil, [sock], nil, timeout).nil?
          sock.close rescue nil
          raise Timeout, "Could not connect to #{host}:#{port} within #{timeout} seconds"
        end
        return connect_nonblock_finalize( sock, sockaddr )
      rescue => ex
        sock.close rescue nil
        raise Error, "Could not connect to #{host}:#{port}: #{ex.class}: #{ex.message}", ex.backtrace
      end


      # Internal: Make sure that a non-blocking connect has truely connected.
      #
      # Ensure that the given socket is actually connected to the given adddress.
      #
      # Returning the socket if it is and raising an Error if it isn't.
      def connect_nonblock_finalize( sock, sockaddr )
        sock.connect_nonblock( sockaddr )
        return sock
      rescue Errno::EISCONN
        return sock
      rescue => ex
        sock.close rescue nil
        raise Error, "Could not connect to #{host}:#{port}: #{ex.class}: #{ex.message}", ex.backtrace
      end

      # Internal: say if we are using TCP Keep Alive or not
      #
      # We will return true if the initialization options :keepalive_active is
      # set to true, and if all the constants that are necessary to use TCP keep
      # alive are defined.
      #
      # It may be the case that on some operating systems that the constants are
      # not defined, so in that case we do not want to attempt to use tcp keep
      # alive if we are unable to do so in any case.
      #
      # Returns true or false
      def using_keepalive?
        using = false
        if keepalive_active? then
          using = [ :SOL_SOCKET, :SO_KEEPALIVE, :SOL_TCP, :TCP_KEEPIDLE, :TCP_KEEPINTVL, :TCP_KEEPCNT].all? do |c|
            ::Socket.const_defined? c
          end
        end
        return using
      end

      # Reads length bytes from the socket
      #
      # length - the number of bytes to read from the socket
      # outbuf - an optional buffer to store the bytes in
      #
      # Returns the bytes read if no outbuf is specified
      def read(length, outbuf = nil)
        if outbuf
          outbuf.replace('')
          buf = outbuf
        else
          buf = ''
        end

        while buf.length < length
          unless rb = readpartial(length - buf.length)
            break
          end

          buf << rb
        end

        return buf
      end

      # Internal: Read up to a maxlen of data from the socket and store it in outbuf
      #
      # maxlen - the maximum number of bytes to read from the socket
      # outbuf - the buffer in which to store the bytes.
      #
      # Returns the bytes read
      def readpartial(maxlen, outbuf = nil)
        return socket.read_nonblock(maxlen, outbuf)
      rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::ECONNRESET
        if wait_readable(read_timeout)
          retry
        else
          raise Timeout, "Could not read from #{host}:#{port} in #{read_timeout} seconds"
        end
      end

      # Internal: Write the given data to the socket
      #
      # buf - the data to write to the socket.
      #
      # Raises an error if it is unable to write the data to the socket within the
      # write_timeout.
      #
      # returns nothing
      def write(buf)
        until buf.nil? or (buf.length == 0) do
          written = socket.write_nonblock(buf)
          buf = buf[written, buf.length]
        end
      rescue Errno::EWOULDBLOCK, Errno::EINTR, Errno::EAGAIN, Errno::ECONNRESET
        if wait_writable(write_timeout)
          retry
        else
          raise Timeout, "Could not write to #{host}:#{port} in #{write_timeout} seconds"
        end
      end

      def wait_writable(timeout = nil)
        IO.select(nil, [@socket], nil, timeout || write_timeout)
      end

      def wait_readable(timeout = nil)
        IO.select([@socket], nil, nil, timeout || read_timeout)
      end
    end
  end
end