1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493
|
# frozen_string_literal: true
begin
gem "redis", ">= 4.0.1"
require "redis"
require "redis/distributed"
rescue LoadError
warn "The Redis cache store requires the redis gem, version 4.0.1 or later. Please add it to your Gemfile: `gem \"redis\", \"~> 4.0\"`"
raise
end
# Prefer the hiredis driver but don't require it.
begin
require "redis/connection/hiredis"
rescue LoadError
end
require "digest/sha2"
require "active_support/core_ext/marshal"
module ActiveSupport
module Cache
module ConnectionPoolLike
def with
yield self
end
end
::Redis.include(ConnectionPoolLike)
::Redis::Distributed.include(ConnectionPoolLike)
# Redis cache store.
#
# Deployment note: Take care to use a *dedicated Redis cache* rather
# than pointing this at your existing Redis server. It won't cope well
# with mixed usage patterns and it won't expire cache entries by default.
#
# Redis cache server setup guide: https://redis.io/topics/lru-cache
#
# * Supports vanilla Redis, hiredis, and Redis::Distributed.
# * Supports Memcached-like sharding across Redises with Redis::Distributed.
# * Fault tolerant. If the Redis server is unavailable, no exceptions are
# raised. Cache fetches are all misses and writes are dropped.
# * Local cache. Hot in-memory primary cache within block/middleware scope.
# * +read_multi+ and +write_multi+ support for Redis mget/mset. Use Redis::Distributed
# 4.0.1+ for distributed mget support.
# * +delete_matched+ support for Redis KEYS globs.
class RedisCacheStore < Store
# Keys are truncated with their own SHA2 digest if they exceed 1kB
MAX_KEY_BYTESIZE = 1024
DEFAULT_REDIS_OPTIONS = {
connect_timeout: 20,
read_timeout: 1,
write_timeout: 1,
reconnect_attempts: 0,
}
DEFAULT_ERROR_HANDLER = -> (method:, returning:, exception:) do
if logger
logger.error { "RedisCacheStore: #{method} failed, returned #{returning.inspect}: #{exception.class}: #{exception.message}" }
end
end
# The maximum number of entries to receive per SCAN call.
SCAN_BATCH_SIZE = 1000
private_constant :SCAN_BATCH_SIZE
# Advertise cache versioning support.
def self.supports_cache_versioning?
true
end
# Support raw values in the local cache strategy.
module LocalCacheWithRaw # :nodoc:
private
def write_entry(key, entry, **options)
if options[:raw] && local_cache
raw_entry = Entry.new(serialize_entry(entry, raw: true))
raw_entry.expires_at = entry.expires_at
super(key, raw_entry, **options)
else
super
end
end
def write_multi_entries(entries, **options)
if options[:raw] && local_cache
raw_entries = entries.map do |key, entry|
raw_entry = Entry.new(serialize_entry(entry, raw: true))
raw_entry.expires_at = entry.expires_at
end.to_h
super(raw_entries, **options)
else
super
end
end
end
prepend Strategy::LocalCache
prepend LocalCacheWithRaw
class << self
# Factory method to create a new Redis instance.
#
# Handles four options: :redis block, :redis instance, single :url
# string, and multiple :url strings.
#
# Option Class Result
# :redis Proc -> options[:redis].call
# :redis Object -> options[:redis]
# :url String -> Redis.new(url: …)
# :url Array -> Redis::Distributed.new([{ url: … }, { url: … }, …])
#
def build_redis(redis: nil, url: nil, **redis_options) #:nodoc:
urls = Array(url)
if redis.is_a?(Proc)
redis.call
elsif redis
redis
elsif urls.size > 1
build_redis_distributed_client urls: urls, **redis_options
else
build_redis_client url: urls.first, **redis_options
end
end
private
def build_redis_distributed_client(urls:, **redis_options)
::Redis::Distributed.new([], DEFAULT_REDIS_OPTIONS.merge(redis_options)).tap do |dist|
urls.each { |u| dist.add_node url: u }
end
end
def build_redis_client(url:, **redis_options)
::Redis.new DEFAULT_REDIS_OPTIONS.merge(redis_options.merge(url: url))
end
end
attr_reader :redis_options
attr_reader :max_key_bytesize
# Creates a new Redis cache store.
#
# Handles four options: :redis block, :redis instance, single :url
# string, and multiple :url strings.
#
# Option Class Result
# :redis Proc -> options[:redis].call
# :redis Object -> options[:redis]
# :url String -> Redis.new(url: …)
# :url Array -> Redis::Distributed.new([{ url: … }, { url: … }, …])
#
# No namespace is set by default. Provide one if the Redis cache
# server is shared with other apps: <tt>namespace: 'myapp-cache'</tt>.
#
# Compression is enabled by default with a 1kB threshold, so cached
# values larger than 1kB are automatically compressed. Disable by
# passing <tt>compress: false</tt> or change the threshold by passing
# <tt>compress_threshold: 4.kilobytes</tt>.
#
# No expiry is set on cache entries by default. Redis is expected to
# be configured with an eviction policy that automatically deletes
# least-recently or -frequently used keys when it reaches max memory.
# See https://redis.io/topics/lru-cache for cache server setup.
#
# Race condition TTL is not set by default. This can be used to avoid
# "thundering herd" cache writes when hot cache entries are expired.
# See <tt>ActiveSupport::Cache::Store#fetch</tt> for more.
def initialize(namespace: nil, compress: true, compress_threshold: 1.kilobyte, expires_in: nil, race_condition_ttl: nil, error_handler: DEFAULT_ERROR_HANDLER, **redis_options)
@redis_options = redis_options
@max_key_bytesize = MAX_KEY_BYTESIZE
@error_handler = error_handler
super namespace: namespace,
compress: compress, compress_threshold: compress_threshold,
expires_in: expires_in, race_condition_ttl: race_condition_ttl
end
def redis
@redis ||= begin
pool_options = self.class.send(:retrieve_pool_options, redis_options)
if pool_options.any?
self.class.send(:ensure_connection_pool_added!)
::ConnectionPool.new(pool_options) { self.class.build_redis(**redis_options) }
else
self.class.build_redis(**redis_options)
end
end
end
def inspect
instance = @redis || @redis_options
"<##{self.class} options=#{options.inspect} redis=#{instance.inspect}>"
end
# Cache Store API implementation.
#
# Read multiple values at once. Returns a hash of requested keys ->
# fetched values.
def read_multi(*names)
if mget_capable?
instrument(:read_multi, names, options) do |payload|
read_multi_mget(*names).tap do |results|
payload[:hits] = results.keys
end
end
else
super
end
end
# Cache Store API implementation.
#
# Supports Redis KEYS glob patterns:
#
# h?llo matches hello, hallo and hxllo
# h*llo matches hllo and heeeello
# h[ae]llo matches hello and hallo, but not hillo
# h[^e]llo matches hallo, hbllo, ... but not hello
# h[a-b]llo matches hallo and hbllo
#
# Use \ to escape special characters if you want to match them verbatim.
#
# See https://redis.io/commands/KEYS for more.
#
# Failsafe: Raises errors.
def delete_matched(matcher, options = nil)
instrument :delete_matched, matcher do
unless String === matcher
raise ArgumentError, "Only Redis glob strings are supported: #{matcher.inspect}"
end
redis.with do |c|
pattern = namespace_key(matcher, options)
cursor = "0"
# Fetch keys in batches using SCAN to avoid blocking the Redis server.
begin
cursor, keys = c.scan(cursor, match: pattern, count: SCAN_BATCH_SIZE)
c.del(*keys) unless keys.empty?
end until cursor == "0"
end
end
end
# Cache Store API implementation.
#
# Increment a cached value. This method uses the Redis incr atomic
# operator and can only be used on values written with the :raw option.
# Calling it on a value not stored with :raw will initialize that value
# to zero.
#
# Failsafe: Raises errors.
def increment(name, amount = 1, options = nil)
instrument :increment, name, amount: amount do
failsafe :increment do
options = merged_options(options)
key = normalize_key(name, options)
redis.with do |c|
c.incrby(key, amount).tap do
write_key_expiry(c, key, options)
end
end
end
end
end
# Cache Store API implementation.
#
# Decrement a cached value. This method uses the Redis decr atomic
# operator and can only be used on values written with the :raw option.
# Calling it on a value not stored with :raw will initialize that value
# to zero.
#
# Failsafe: Raises errors.
def decrement(name, amount = 1, options = nil)
instrument :decrement, name, amount: amount do
failsafe :decrement do
options = merged_options(options)
key = normalize_key(name, options)
redis.with do |c|
c.decrby(key, amount).tap do
write_key_expiry(c, key, options)
end
end
end
end
end
# Cache Store API implementation.
#
# Removes expired entries. Handled natively by Redis least-recently-/
# least-frequently-used expiry, so manual cleanup is not supported.
def cleanup(options = nil)
super
end
# Clear the entire cache on all Redis servers. Safe to use on
# shared servers if the cache is namespaced.
#
# Failsafe: Raises errors.
def clear(options = nil)
failsafe :clear do
if namespace = merged_options(options)[:namespace]
delete_matched "*", namespace: namespace
else
redis.with { |c| c.flushdb }
end
end
end
def mget_capable? #:nodoc:
set_redis_capabilities unless defined? @mget_capable
@mget_capable
end
def mset_capable? #:nodoc:
set_redis_capabilities unless defined? @mset_capable
@mset_capable
end
private
def set_redis_capabilities
case redis
when Redis::Distributed
@mget_capable = true
@mset_capable = false
else
@mget_capable = true
@mset_capable = true
end
end
# Store provider interface:
# Read an entry from the cache.
def read_entry(key, **options)
failsafe :read_entry do
raw = options&.fetch(:raw, false)
deserialize_entry(redis.with { |c| c.get(key) }, raw: raw)
end
end
def read_multi_entries(names, **options)
if mget_capable?
read_multi_mget(*names)
else
super
end
end
def read_multi_mget(*names)
options = names.extract_options!
options = merged_options(options)
return {} if names == []
raw = options&.fetch(:raw, false)
keys = names.map { |name| normalize_key(name, options) }
values = failsafe(:read_multi_mget, returning: {}) do
redis.with { |c| c.mget(*keys) }
end
names.zip(values).each_with_object({}) do |(name, value), results|
if value
entry = deserialize_entry(value, raw: raw)
unless entry.nil? || entry.expired? || entry.mismatched?(normalize_version(name, options))
results[name] = entry.value
end
end
end
end
# Write an entry to the cache.
#
# Requires Redis 2.6.12+ for extended SET options.
def write_entry(key, entry, unless_exist: false, raw: false, expires_in: nil, race_condition_ttl: nil, **options)
serialized_entry = serialize_entry(entry, raw: raw)
# If race condition TTL is in use, ensure that cache entries
# stick around a bit longer after they would have expired
# so we can purposefully serve stale entries.
if race_condition_ttl && expires_in && expires_in > 0 && !raw
expires_in += 5.minutes
end
failsafe :write_entry, returning: false do
if unless_exist || expires_in
modifiers = {}
modifiers[:nx] = unless_exist
modifiers[:px] = (1000 * expires_in.to_f).ceil if expires_in
redis.with { |c| c.set key, serialized_entry, modifiers }
else
redis.with { |c| c.set key, serialized_entry }
end
end
end
def write_key_expiry(client, key, options)
if options[:expires_in] && client.ttl(key).negative?
client.expire key, options[:expires_in].to_i
end
end
# Delete an entry from the cache.
def delete_entry(key, options)
failsafe :delete_entry, returning: false do
redis.with { |c| c.del key }
end
end
# Nonstandard store provider API to write multiple values at once.
def write_multi_entries(entries, expires_in: nil, **options)
if entries.any?
if mset_capable? && expires_in.nil?
failsafe :write_multi_entries do
redis.with { |c| c.mapped_mset(serialize_entries(entries, raw: options[:raw])) }
end
else
super
end
end
end
# Truncate keys that exceed 1kB.
def normalize_key(key, options)
truncate_key super.b
end
def truncate_key(key)
if key.bytesize > max_key_bytesize
suffix = ":sha2:#{::Digest::SHA2.hexdigest(key)}"
truncate_at = max_key_bytesize - suffix.bytesize
"#{key.byteslice(0, truncate_at)}#{suffix}"
else
key
end
end
def deserialize_entry(serialized_entry, raw:)
if serialized_entry
entry = Marshal.load(serialized_entry) rescue serialized_entry
written_raw = serialized_entry.equal?(entry)
if raw != written_raw
ActiveSupport::Deprecation.warn(<<-MSG.squish)
Using a different value for the raw option when reading and writing
to a cache key is deprecated for :redis_cache_store and Rails 6.0
will stop automatically detecting the format when reading to avoid
marshal loading untrusted raw strings.
MSG
end
entry.is_a?(Entry) ? entry : Entry.new(entry)
end
end
def serialize_entry(entry, raw: false)
if raw
entry.value.to_s
else
Marshal.dump(entry)
end
end
def serialize_entries(entries, raw: false)
entries.transform_values do |entry|
serialize_entry entry, raw: raw
end
end
def failsafe(method, returning: nil)
yield
rescue ::Redis::BaseError => e
handle_exception exception: e, method: method, returning: returning
returning
end
def handle_exception(exception:, method:, returning:)
if @error_handler
@error_handler.(method: method, exception: exception, returning: returning)
end
rescue => failsafe
warn "RedisCacheStore ignored exception in handle_exception: #{failsafe.class}: #{failsafe.message}\n #{failsafe.backtrace.join("\n ")}"
end
end
end
end
|