1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
|
# frozen_string_literal: true
require "monitor"
require "redis/errors"
require "redis/commands"
class Redis
BASE_PATH = __dir__
Deprecated = Class.new(StandardError)
class << self
attr_accessor :silence_deprecations, :raise_deprecations
def deprecate!(message)
unless silence_deprecations
if raise_deprecations
raise Deprecated, message
else
::Kernel.warn(message)
end
end
end
end
# soft-deprecated
# We added this back for older sidekiq releases
module Connection
class << self
def drivers
[RedisClient.default_driver]
end
end
end
include Commands
SERVER_URL_OPTIONS = %i(url host port path).freeze
# Create a new client instance
#
# @param [Hash] options
# @option options [String] :url (value of the environment variable REDIS_URL) a Redis URL, for a TCP connection:
# `redis://:[password]@[hostname]:[port]/[db]` (password, port and database are optional), for a unix socket
# connection: `unix://[path to Redis socket]`. This overrides all other options.
# @option options [String] :host ("127.0.0.1") server hostname
# @option options [Integer] :port (6379) server port
# @option options [String] :path path to server socket (overrides host and port)
# @option options [Float] :timeout (1.0) timeout in seconds
# @option options [Float] :connect_timeout (same as timeout) timeout for initial connect in seconds
# @option options [String] :username Username to authenticate against server
# @option options [String] :password Password to authenticate against server
# @option options [Integer] :db (0) Database to select after connect and on reconnects
# @option options [Symbol] :driver Driver to use, currently supported: `:ruby`, `:hiredis`
# @option options [String] :id ID for the client connection, assigns name to current connection by sending
# `CLIENT SETNAME`
# @option options [Integer, Array<Integer, Float>] :reconnect_attempts Number of attempts trying to connect,
# or a list of sleep duration between attempts.
# @option options [Boolean] :inherit_socket (false) Whether to use socket in forked process or not
# @option options [String] :name The name of the server group to connect to.
# @option options [Array] :sentinels List of sentinels to contact
#
# @return [Redis] a new client instance
def initialize(options = {})
@monitor = Monitor.new
@options = options.dup
@options[:reconnect_attempts] = 1 unless @options.key?(:reconnect_attempts)
if ENV["REDIS_URL"] && SERVER_URL_OPTIONS.none? { |o| @options.key?(o) }
@options[:url] = ENV["REDIS_URL"]
end
inherit_socket = @options.delete(:inherit_socket)
@subscription_client = nil
@client = initialize_client(@options)
@client.inherit_socket! if inherit_socket
end
# Run code without the client reconnecting
def without_reconnect(&block)
@client.disable_reconnection(&block)
end
# Test whether or not the client is connected
def connected?
@client.connected? || @subscription_client&.connected?
end
# Disconnect the client as quickly and silently as possible.
def close
@client.close
@subscription_client&.close
end
alias disconnect! close
def with
yield self
end
def _client
@client
end
def pipelined(exception: true)
synchronize do |client|
client.pipelined(exception: exception) do |raw_pipeline|
yield PipelinedConnection.new(raw_pipeline, exception: exception)
end
end
end
def id
@client.id || @client.server_url
end
def inspect
"#<Redis client v#{Redis::VERSION} for #{id}>"
end
def dup
self.class.new(@options)
end
def connection
{
host: @client.host,
port: @client.port,
db: @client.db,
id: id,
location: "#{@client.host}:#{@client.port}"
}
end
private
def initialize_client(options)
if options.key?(:cluster)
raise "Redis Cluster support was moved to the `redis-clustering` gem."
end
if options.key?(:sentinels)
Client.sentinel(**options).new_client
else
Client.config(**options).new_client
end
end
def synchronize
@monitor.synchronize { yield(@client) }
end
def send_command(command, &block)
@monitor.synchronize do
@client.call_v(command, &block)
end
rescue ::RedisClient::Error => error
Client.translate_error!(error)
end
def send_blocking_command(command, timeout, &block)
@monitor.synchronize do
@client.blocking_call_v(timeout, command, &block)
end
end
def _subscription(method, timeout, channels, block)
if block
if @subscription_client
raise SubscriptionError, "This client is already subscribed"
end
begin
@subscription_client = SubscribedClient.new(@client.pubsub)
if timeout > 0
@subscription_client.send(method, timeout, *channels, &block)
else
@subscription_client.send(method, *channels, &block)
end
ensure
@subscription_client&.close
@subscription_client = nil
end
else
unless @subscription_client
raise SubscriptionError, "This client is not subscribed"
end
@subscription_client.call_v([method].concat(channels))
end
end
end
require "redis/version"
require "redis/client"
require "redis/pipeline"
require "redis/subscribe"
|