1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
|
# frozen_string_literal: true
require "socket"
require "openssl"
require "redis_client/connection_mixin"
require "redis_client/ruby_connection/buffered_io"
require "redis_client/ruby_connection/resp3"
class RedisClient
class RubyConnection
include ConnectionMixin
class << self
def ssl_context(ssl_params)
params = ssl_params.dup || {}
cert = params[:cert]
if cert.is_a?(String)
cert = File.read(cert) if File.exist?(cert)
params[:cert] = OpenSSL::X509::Certificate.new(cert)
end
key = params[:key]
if key.is_a?(String)
key = File.read(key) if File.exist?(key)
params[:key] = OpenSSL::PKey.read(key)
end
context = OpenSSL::SSL::SSLContext.new
context.set_params(params)
if context.verify_mode != OpenSSL::SSL::VERIFY_NONE
if context.respond_to?(:verify_hostname) # Missing on JRuby
context.verify_hostname
end
end
context
end
end
SUPPORTS_RESOLV_TIMEOUT = Socket.method(:tcp).parameters.any? { |p| p.last == :resolv_timeout }
attr_reader :config
def initialize(config, connect_timeout:, read_timeout:, write_timeout:)
super()
@config = config
@connect_timeout = connect_timeout
@read_timeout = read_timeout
@write_timeout = write_timeout
connect
end
def connected?
!@io.closed?
end
def close
@io.close
super
end
def read_timeout=(timeout)
@read_timeout = timeout
@io.read_timeout = timeout if @io
end
def write_timeout=(timeout)
@write_timeout = timeout
@io.write_timeout = timeout if @io
end
def write(command)
buffer = RESP3.dump(command)
begin
@io.write(buffer)
rescue SystemCallError, IOError, OpenSSL::SSL::SSLError => error
raise ConnectionError.with_config(error.message, config)
end
end
def write_multi(commands)
buffer = nil
commands.each do |command|
buffer = RESP3.dump(command, buffer)
end
begin
@io.write(buffer)
rescue SystemCallError, IOError, OpenSSL::SSL::SSLError => error
raise ConnectionError.with_config(error.message, config)
end
end
def read(timeout = nil)
if timeout.nil?
RESP3.load(@io)
else
@io.with_timeout(timeout) { RESP3.load(@io) }
end
rescue RedisClient::RESP3::UnknownType => error
raise RedisClient::ProtocolError.with_config(error.message, config)
rescue SystemCallError, IOError, OpenSSL::SSL::SSLError => error
raise ConnectionError.with_config(error.message, config)
end
def measure_round_trip_delay
start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond)
call(["PING"], @read_timeout)
Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) - start
end
private
def connect
socket = if @config.path
UNIXSocket.new(@config.path)
else
sock = if SUPPORTS_RESOLV_TIMEOUT
Socket.tcp(@config.host, @config.port, connect_timeout: @connect_timeout, resolv_timeout: @connect_timeout)
else
Socket.tcp(@config.host, @config.port, connect_timeout: @connect_timeout)
end
# disables Nagle's Algorithm, prevents multiple round trips with MULTI
sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
enable_socket_keep_alive(sock)
sock
end
if @config.ssl
socket = OpenSSL::SSL::SSLSocket.new(socket, @config.ssl_context)
socket.hostname = @config.host
loop do
case status = socket.connect_nonblock(exception: false)
when :wait_readable
socket.to_io.wait_readable(@connect_timeout) or raise CannotConnectError.with_config("", config)
when :wait_writable
socket.to_io.wait_writable(@connect_timeout) or raise CannotConnectError.with_config("", config)
when socket
break
else
raise "Unexpected `connect_nonblock` return: #{status.inspect}"
end
end
end
@io = BufferedIO.new(
socket,
read_timeout: @read_timeout,
write_timeout: @write_timeout,
)
true
rescue SystemCallError, OpenSSL::SSL::SSLError, SocketError => error
socket&.close
raise CannotConnectError, error.message, error.backtrace
end
KEEP_ALIVE_INTERVAL = 15 # Same as hiredis defaults
KEEP_ALIVE_TTL = 120 # Longer than hiredis defaults
KEEP_ALIVE_PROBES = (KEEP_ALIVE_TTL / KEEP_ALIVE_INTERVAL) - 1
private_constant :KEEP_ALIVE_INTERVAL
private_constant :KEEP_ALIVE_TTL
private_constant :KEEP_ALIVE_PROBES
if %i[SOL_TCP SOL_SOCKET TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT].all? { |c| Socket.const_defined? c } # Linux
def enable_socket_keep_alive(socket)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, KEEP_ALIVE_INTERVAL)
socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, KEEP_ALIVE_INTERVAL)
socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, KEEP_ALIVE_PROBES)
end
elsif %i[IPPROTO_TCP TCP_KEEPINTVL TCP_KEEPCNT].all? { |c| Socket.const_defined? c } # macOS
def enable_socket_keep_alive(socket)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPINTVL, KEEP_ALIVE_INTERVAL)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPCNT, KEEP_ALIVE_PROBES)
end
elsif %i[SOL_SOCKET SO_KEEPALIVE].all? { |c| Socket.const_defined? c } # unknown POSIX
def enable_socket_keep_alive(socket)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
end
else # unknown
def enable_socket_keep_alive(_socket)
end
end
end
end
|