File: circuit_breaker.rb

package info (click to toggle)
ruby-redis-client 0.22.2-1.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 224 kB
  • sloc: ruby: 2,079; makefile: 4
file content (108 lines) | stat: -rw-r--r-- 2,656 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# frozen_string_literal: true

class RedisClient
  class CircuitBreaker
    module Middleware
      def connect(config)
        config.circuit_breaker.protect { super }
      end

      def call(_command, config)
        config.circuit_breaker.protect { super }
      end

      def call_pipelined(_commands, config)
        config.circuit_breaker.protect { super }
      end
    end

    OpenCircuitError = Class.new(CannotConnectError)

    attr_reader :error_timeout, :error_threshold, :error_threshold_timeout, :success_threshold

    def initialize(error_threshold:, error_timeout:, error_threshold_timeout: error_timeout, success_threshold: 0)
      @error_threshold = Integer(error_threshold)
      @error_threshold_timeout = Float(error_threshold_timeout)
      @error_timeout = Float(error_timeout)
      @success_threshold = Integer(success_threshold)
      @errors = []
      @successes = 0
      @state = :closed
      @lock = Mutex.new
    end

    def protect
      if @state == :open
        refresh_state
      end

      case @state
      when :open
        raise OpenCircuitError, "Too many connection errors happened recently"
      when :closed
        begin
          yield
        rescue ConnectionError
          record_error
          raise
        end
      when :half_open
        begin
          result = yield
          record_success
          result
        rescue ConnectionError
          record_error
          raise
        end
      else
        raise "[BUG] RedisClient::CircuitBreaker unexpected @state (#{@state.inspect}})"
      end
    end

    private

    def refresh_state
      now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      @lock.synchronize do
        if @errors.last < (now - @error_timeout)
          if @success_threshold > 0
            @state = :half_open
            @successes = 0
          else
            @errors.clear
            @state = :closed
          end
        end
      end
    end

    def record_error
      now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      expiry = now - @error_timeout
      @lock.synchronize do
        if @state == :closed
          @errors.reject! { |t| t < expiry }
        end
        @errors << now
        @successes = 0
        if @state == :half_open || (@state == :closed && @errors.size >= @error_threshold)
          @state = :open
        end
      end
    end

    def record_success
      return unless @state == :half_open

      @lock.synchronize do
        return unless @state == :half_open

        @successes += 1
        if @successes >= @success_threshold
          @state = :closed
        end
      end
    end
  end
end