1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
# frozen_string_literal: true
require "connection_pool"
require "redis"
require "uri"
module Sidekiq
class RedisConnection
class << self
def create(options = {})
symbolized_options = options.transform_keys(&:to_sym)
if !symbolized_options[:url] && (u = determine_redis_provider)
symbolized_options[:url] = u
end
size = if symbolized_options[:size]
symbolized_options[:size]
elsif Sidekiq.server?
# Give ourselves plenty of connections. pool is lazy
# so we won't create them until we need them.
Sidekiq.options[:concurrency] + 5
elsif ENV["RAILS_MAX_THREADS"]
Integer(ENV["RAILS_MAX_THREADS"])
else
5
end
verify_sizing(size, Sidekiq.options[:concurrency]) if Sidekiq.server?
pool_timeout = symbolized_options[:pool_timeout] || 1
log_info(symbolized_options)
ConnectionPool.new(timeout: pool_timeout, size: size) do
build_client(symbolized_options)
end
end
private
# Sidekiq needs a lot of concurrent Redis connections.
#
# We need a connection for each Processor.
# We need a connection for Pro's real-time change listener
# We need a connection to various features to call Redis every few seconds:
# - the process heartbeat.
# - enterprise's leader election
# - enterprise's cron support
def verify_sizing(size, concurrency)
raise ArgumentError, "Your Redis connection pool is too small for Sidekiq to work. Your pool has #{size} connections but must have at least #{concurrency + 2}" if size < (concurrency + 2)
end
def build_client(options)
namespace = options[:namespace]
client = Redis.new client_opts(options)
if namespace
begin
require "redis/namespace"
Redis::Namespace.new(namespace, redis: client)
rescue LoadError
Sidekiq.logger.error("Your Redis configuration uses the namespace '#{namespace}' but the redis-namespace gem is not included in the Gemfile." \
"Add the gem to your Gemfile to continue using a namespace. Otherwise, remove the namespace parameter.")
exit(-127)
end
else
client
end
end
def client_opts(options)
opts = options.dup
if opts[:namespace]
opts.delete(:namespace)
end
if opts[:network_timeout]
opts[:timeout] = opts[:network_timeout]
opts.delete(:network_timeout)
end
opts[:driver] ||= Redis::Connection.drivers.last || "ruby"
# Issue #3303, redis-rb will silently retry an operation.
# This can lead to duplicate jobs if Sidekiq::Client's LPUSH
# is performed twice but I believe this is much, much rarer
# than the reconnect silently fixing a problem; we keep it
# on by default.
opts[:reconnect_attempts] ||= 1
opts
end
def log_info(options)
redacted = "REDACTED"
# Deep clone so we can muck with these options all we want and exclude
# params from dump-and-load that may contain objects that Marshal is
# unable to safely dump.
keys = options.keys - [:logger, :ssl_params]
scrubbed_options = Marshal.load(Marshal.dump(options.slice(*keys)))
if scrubbed_options[:url] && (uri = URI.parse(scrubbed_options[:url])) && uri.password
uri.password = redacted
scrubbed_options[:url] = uri.to_s
end
if scrubbed_options[:password]
scrubbed_options[:password] = redacted
end
scrubbed_options[:sentinels]&.each do |sentinel|
sentinel[:password] = redacted if sentinel[:password]
end
if Sidekiq.server?
Sidekiq.logger.info("Booting Sidekiq #{Sidekiq::VERSION} with redis options #{scrubbed_options}")
else
Sidekiq.logger.debug("#{Sidekiq::NAME} client with redis options #{scrubbed_options}")
end
end
def determine_redis_provider
# If you have this in your environment:
# MY_REDIS_URL=redis://hostname.example.com:1238/4
# then set:
# REDIS_PROVIDER=MY_REDIS_URL
# and Sidekiq will find your custom URL variable with no custom
# initialization code at all.
#
p = ENV["REDIS_PROVIDER"]
if p && p =~ /:/
raise <<~EOM
REDIS_PROVIDER should be set to the name of the variable which contains the Redis URL, not a URL itself.
Platforms like Heroku will sell addons that publish a *_URL variable. You need to tell Sidekiq with REDIS_PROVIDER, e.g.:
REDISTOGO_URL=redis://somehost.example.com:6379/4
REDIS_PROVIDER=REDISTOGO_URL
EOM
end
ENV[
p || "REDIS_URL"
]
end
end
end
end
|