1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
# frozen_string_literal: true
class RedisClient
class CircuitBreaker
module Middleware
def connect(config)
config.circuit_breaker.protect { super }
end
def call(_command, config)
config.circuit_breaker.protect { super }
end
def call_pipelined(_commands, config)
config.circuit_breaker.protect { super }
end
end
OpenCircuitError = Class.new(CannotConnectError)
attr_reader :error_timeout, :error_threshold, :error_threshold_timeout, :success_threshold
def initialize(error_threshold:, error_timeout:, error_threshold_timeout: error_timeout, success_threshold: 0)
@error_threshold = Integer(error_threshold)
@error_threshold_timeout = Float(error_threshold_timeout)
@error_timeout = Float(error_timeout)
@success_threshold = Integer(success_threshold)
@errors = []
@successes = 0
@state = :closed
@lock = Mutex.new
end
def protect
if @state == :open
refresh_state
end
case @state
when :open
raise OpenCircuitError, "Too many connection errors happened recently"
when :closed
begin
yield
rescue ConnectionError
record_error
raise
end
when :half_open
begin
result = yield
record_success
result
rescue ConnectionError
record_error
raise
end
else
raise "[BUG] RedisClient::CircuitBreaker unexpected @state (#{@state.inspect}})"
end
end
private
def refresh_state
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@lock.synchronize do
if @errors.last < (now - @error_timeout)
if @success_threshold > 0
@state = :half_open
@successes = 0
else
@errors.clear
@state = :closed
end
end
end
end
def record_error
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
expiry = now - @error_timeout
@lock.synchronize do
if @state == :closed
@errors.reject! { |t| t < expiry }
end
@errors << now
@successes = 0
if @state == :half_open || (@state == :closed && @errors.size >= @error_threshold)
@state = :open
end
end
end
def record_success
return unless @state == :half_open
@lock.synchronize do
return unless @state == :half_open
@successes += 1
if @successes >= @success_threshold
@state = :closed
end
end
end
end
end
|