File: ruby_connection.rb

package info (click to toggle)
ruby-redis-client 0.28.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,116 kB
  • sloc: ansic: 7,517; ruby: 5,801; makefile: 246; sh: 123
file content (193 lines) | stat: -rw-r--r-- 6,130 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# frozen_string_literal: true

require "socket"
require "openssl"
require "redis_client/connection_mixin"
require "redis_client/ruby_connection/buffered_io"
require "redis_client/ruby_connection/resp3"

class RedisClient
  class RubyConnection
    include ConnectionMixin

    class << self
      def ssl_context(ssl_params)
        params = ssl_params.dup || {}

        cert = params[:cert]
        if cert.is_a?(String)
          cert = File.read(cert) if File.exist?(cert)
          params[:cert] = OpenSSL::X509::Certificate.new(cert)
        end

        key = params[:key]
        if key.is_a?(String)
          key = File.read(key) if File.exist?(key)
          params[:key] = OpenSSL::PKey.read(key)
        end

        context = OpenSSL::SSL::SSLContext.new
        context.set_params(params)
        if context.verify_mode != OpenSSL::SSL::VERIFY_NONE
          if context.respond_to?(:verify_hostname) # Missing on JRuby
            context.verify_hostname
          end
        end

        context
      end
    end

    SUPPORTS_RESOLV_TIMEOUT = Socket.method(:tcp).parameters.any? { |p| p.last == :resolv_timeout }

    def initialize(config, connect_timeout:, read_timeout:, write_timeout:)
      super(config)
      @connect_timeout = connect_timeout
      @read_timeout = read_timeout
      @write_timeout = write_timeout
      connect
    end

    def connected?
      !@io.closed?
    end

    def close
      @io.close
      super
    end

    def read_timeout=(timeout)
      @read_timeout = timeout
      @io.read_timeout = timeout if @io
    end

    def write_timeout=(timeout)
      @write_timeout = timeout
      @io.write_timeout = timeout if @io
    end

    def write(command)
      buffer = RESP3.dump(command)
      begin
        @io.write(buffer)
      rescue SystemCallError, IOError, OpenSSL::SSL::SSLError => error
        raise connection_error(error.message)
      rescue Error => error
        error._set_config(config)
        error._set_retry_attempt(@retry_attempt)
        raise error
      end
    end

    def write_multi(commands)
      buffer = nil
      commands.each do |command|
        buffer = RESP3.dump(command, buffer)
      end
      begin
        @io.write(buffer)
      rescue SystemCallError, IOError, OpenSSL::SSL::SSLError => error
        raise connection_error(error.message)
      end
    end

    def read(timeout = nil)
      if timeout.nil?
        RESP3.load(@io)
      else
        @io.with_timeout(timeout) { RESP3.load(@io) }
      end
    rescue RedisClient::RESP3::Error => error
      raise protocol_error(error.message)
    rescue SystemCallError, IOError, OpenSSL::SSL::SSLError => error
      raise connection_error(error.message)
    end

    def measure_round_trip_delay
      start = RedisClient.now_ms
      call(["PING"], @read_timeout)
      RedisClient.now_ms - start
    end

    private

    def connect
      @server_key = @config.server_key
      socket = if @config.path
        UNIXSocket.new(@config.path)
      else
        sock = if SUPPORTS_RESOLV_TIMEOUT
          begin
            Socket.tcp(@config.host, @config.port, connect_timeout: @connect_timeout, resolv_timeout: @connect_timeout)
          rescue Errno::ETIMEDOUT => timeout_error
            timeout_error.message << ": #{@connect_timeout}s"
            raise
          end
        else
          Socket.tcp(@config.host, @config.port, connect_timeout: @connect_timeout)
        end
        # disables Nagle's Algorithm, prevents multiple round trips with MULTI
        sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
        enable_socket_keep_alive(sock)
        sock
      end

      if @config.ssl
        socket = OpenSSL::SSL::SSLSocket.new(socket, @config.ssl_context)
        socket.hostname = @config.host
        loop do
          case status = socket.connect_nonblock(exception: false)
          when :wait_readable
            socket.to_io.wait_readable(@connect_timeout) or raise CannotConnectError.with_config("", config)
          when :wait_writable
            socket.to_io.wait_writable(@connect_timeout) or raise CannotConnectError.with_config("", config)
          when socket
            break
          else
            raise "Unexpected `connect_nonblock` return: #{status.inspect}"
          end
        end
      end

      @io = BufferedIO.new(
        socket,
        read_timeout: @read_timeout,
        write_timeout: @write_timeout,
      )
      true
    rescue SystemCallError, IOError, OpenSSL::SSL::SSLError, SocketError => error
      socket&.close
      raise CannotConnectError, error.message, error.backtrace
    end

    KEEP_ALIVE_INTERVAL = 15 # Same as hiredis defaults
    KEEP_ALIVE_TTL = 120 # Longer than hiredis defaults
    KEEP_ALIVE_PROBES = (KEEP_ALIVE_TTL / KEEP_ALIVE_INTERVAL) - 1
    private_constant :KEEP_ALIVE_INTERVAL
    private_constant :KEEP_ALIVE_TTL
    private_constant :KEEP_ALIVE_PROBES

    if %i[SOL_TCP SOL_SOCKET TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT].all? { |c| Socket.const_defined? c } # Linux
      def enable_socket_keep_alive(socket)
        socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
        socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, KEEP_ALIVE_TTL)
        socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, KEEP_ALIVE_INTERVAL)
        socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, KEEP_ALIVE_PROBES)
      end
    elsif %i[IPPROTO_TCP TCP_KEEPINTVL TCP_KEEPCNT].all? { |c| Socket.const_defined? c } # macOS
      def enable_socket_keep_alive(socket)
        socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
        socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPINTVL, KEEP_ALIVE_INTERVAL)
        socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPCNT, KEEP_ALIVE_PROBES)
      end
    elsif %i[SOL_SOCKET SO_KEEPALIVE].all? { |c| Socket.const_defined? c } # unknown POSIX
      def enable_socket_keep_alive(socket)
        socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
      end
    else # unknown
      def enable_socket_keep_alive(_socket)
      end
    end
  end
end