File: ruby_connection.rb

package info (click to toggle)
ruby-redis-client 0.22.2-1.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 224 kB
  • sloc: ruby: 2,079; makefile: 4
file content (186 lines) | stat: -rw-r--r-- 6,028 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# frozen_string_literal: true

require "socket"
require "openssl"
require "redis_client/connection_mixin"
require "redis_client/ruby_connection/buffered_io"
require "redis_client/ruby_connection/resp3"

class RedisClient
  class RubyConnection
    include ConnectionMixin

    class << self
      def ssl_context(ssl_params)
        params = ssl_params.dup || {}

        cert = params[:cert]
        if cert.is_a?(String)
          cert = File.read(cert) if File.exist?(cert)
          params[:cert] = OpenSSL::X509::Certificate.new(cert)
        end

        key = params[:key]
        if key.is_a?(String)
          key = File.read(key) if File.exist?(key)
          params[:key] = OpenSSL::PKey.read(key)
        end

        context = OpenSSL::SSL::SSLContext.new
        context.set_params(params)
        if context.verify_mode != OpenSSL::SSL::VERIFY_NONE
          if context.respond_to?(:verify_hostname) # Missing on JRuby
            context.verify_hostname
          end
        end

        context
      end
    end

    SUPPORTS_RESOLV_TIMEOUT = Socket.method(:tcp).parameters.any? { |p| p.last == :resolv_timeout }

    attr_reader :config

    def initialize(config, connect_timeout:, read_timeout:, write_timeout:)
      super()
      @config = config
      @connect_timeout = connect_timeout
      @read_timeout = read_timeout
      @write_timeout = write_timeout
      connect
    end

    def connected?
      !@io.closed?
    end

    def close
      @io.close
      super
    end

    def read_timeout=(timeout)
      @read_timeout = timeout
      @io.read_timeout = timeout if @io
    end

    def write_timeout=(timeout)
      @write_timeout = timeout
      @io.write_timeout = timeout if @io
    end

    def write(command)
      buffer = RESP3.dump(command)
      begin
        @io.write(buffer)
      rescue SystemCallError, IOError, OpenSSL::SSL::SSLError => error
        raise ConnectionError.with_config(error.message, config)
      end
    end

    def write_multi(commands)
      buffer = nil
      commands.each do |command|
        buffer = RESP3.dump(command, buffer)
      end
      begin
        @io.write(buffer)
      rescue SystemCallError, IOError, OpenSSL::SSL::SSLError => error
        raise ConnectionError.with_config(error.message, config)
      end
    end

    def read(timeout = nil)
      if timeout.nil?
        RESP3.load(@io)
      else
        @io.with_timeout(timeout) { RESP3.load(@io) }
      end
    rescue RedisClient::RESP3::UnknownType => error
      raise RedisClient::ProtocolError.with_config(error.message, config)
    rescue SystemCallError, IOError, OpenSSL::SSL::SSLError => error
      raise ConnectionError.with_config(error.message, config)
    end

    def measure_round_trip_delay
      start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond)
      call(["PING"], @read_timeout)
      Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) - start
    end

    private

    def connect
      socket = if @config.path
        UNIXSocket.new(@config.path)
      else
        sock = if SUPPORTS_RESOLV_TIMEOUT
          Socket.tcp(@config.host, @config.port, connect_timeout: @connect_timeout, resolv_timeout: @connect_timeout)
        else
          Socket.tcp(@config.host, @config.port, connect_timeout: @connect_timeout)
        end
        # disables Nagle's Algorithm, prevents multiple round trips with MULTI
        sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
        enable_socket_keep_alive(sock)
        sock
      end

      if @config.ssl
        socket = OpenSSL::SSL::SSLSocket.new(socket, @config.ssl_context)
        socket.hostname = @config.host
        loop do
          case status = socket.connect_nonblock(exception: false)
          when :wait_readable
            socket.to_io.wait_readable(@connect_timeout) or raise CannotConnectError.with_config("", config)
          when :wait_writable
            socket.to_io.wait_writable(@connect_timeout) or raise CannotConnectError.with_config("", config)
          when socket
            break
          else
            raise "Unexpected `connect_nonblock` return: #{status.inspect}"
          end
        end
      end

      @io = BufferedIO.new(
        socket,
        read_timeout: @read_timeout,
        write_timeout: @write_timeout,
      )
      true
    rescue SystemCallError, OpenSSL::SSL::SSLError, SocketError => error
      socket&.close
      raise CannotConnectError, error.message, error.backtrace
    end

    KEEP_ALIVE_INTERVAL = 15 # Same as hiredis defaults
    KEEP_ALIVE_TTL = 120 # Longer than hiredis defaults
    KEEP_ALIVE_PROBES = (KEEP_ALIVE_TTL / KEEP_ALIVE_INTERVAL) - 1
    private_constant :KEEP_ALIVE_INTERVAL
    private_constant :KEEP_ALIVE_TTL
    private_constant :KEEP_ALIVE_PROBES

    if %i[SOL_TCP SOL_SOCKET TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT].all? { |c| Socket.const_defined? c } # Linux
      def enable_socket_keep_alive(socket)
        socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
        socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, KEEP_ALIVE_INTERVAL)
        socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, KEEP_ALIVE_INTERVAL)
        socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, KEEP_ALIVE_PROBES)
      end
    elsif %i[IPPROTO_TCP TCP_KEEPINTVL TCP_KEEPCNT].all? { |c| Socket.const_defined? c } # macOS
      def enable_socket_keep_alive(socket)
        socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
        socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPINTVL, KEEP_ALIVE_INTERVAL)
        socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPCNT, KEEP_ALIVE_PROBES)
      end
    elsif %i[SOL_SOCKET SO_KEEPALIVE].all? { |c| Socket.const_defined? c } # unknown POSIX
      def enable_socket_keep_alive(socket)
        socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
      end
    else # unknown
      def enable_socket_keep_alive(_socket)
      end
    end
  end
end