File: circuit_breaker.rb

package info (click to toggle)
ruby-httpx 1.7.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,816 kB
  • sloc: ruby: 12,209; makefile: 4
file content (147 lines) | stat: -rw-r--r-- 5,075 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# frozen_string_literal: true

module HTTPX
  module Plugins
    #
    # This plugin implements a circuit breaker around connection errors.
    #
    # https://gitlab.com/os85/httpx/wikis/Circuit-Breaker
    #
    module CircuitBreaker
      using URIExtensions

      def self.load_dependencies(*)
        require_relative "circuit_breaker/circuit"
        require_relative "circuit_breaker/circuit_store"
      end

      def self.extra_options(options)
        options.merge(
          circuit_breaker_max_attempts: 3,
          circuit_breaker_reset_attempts_in: 60,
          circuit_breaker_break_in: 60,
          circuit_breaker_half_open_drip_rate: 1
        )
      end

      module InstanceMethods
        include HTTPX::Callbacks

        def initialize(*)
          super
          @circuit_store = CircuitStore.new(@options)
        end

        %i[circuit_open].each do |meth|
          class_eval(<<-MOD, __FILE__, __LINE__ + 1)
        def on_#{meth}(&blk)   # def on_circuit_open(&blk)
          on(:#{meth}, &blk)   #   on(:circuit_open, &blk)
          self                 #   self
        end                    # end
          MOD
        end

        private

        def send_requests(*requests)
          # @type var short_circuit_responses: Array[response]
          short_circuit_responses = []

          # run all requests through the circuit breaker, see if the circuit is
          # open for any of them.
          real_requests = requests.each_with_index.with_object([]) do |(req, idx), real_reqs|
            short_circuit_response = @circuit_store.try_respond(req)
            if short_circuit_response.nil?
              real_reqs << req
              next
            end
            short_circuit_responses[idx] = short_circuit_response
          end

          # run requests for the remainder
          unless real_requests.empty?
            responses = super(*real_requests)

            real_requests.each_with_index do |request, idx|
              short_circuit_responses[requests.index(request)] = responses[idx]
            end
          end

          short_circuit_responses
        end

        def set_request_callbacks(request)
          super
          request.on(:response) do |response|
            emit(:circuit_open, request) if try_circuit_open(request, response)
          end
        end

        def try_circuit_open(request, response)
          if response.is_a?(ErrorResponse)
            case response.error
            when RequestTimeoutError
              @circuit_store.try_open(request.uri, response)
            else
              @circuit_store.try_open(request.origin, response)
            end
          elsif (break_on = request.options.circuit_breaker_break_on) && break_on.call(response)
            @circuit_store.try_open(request.uri, response)
          else
            @circuit_store.try_close(request.uri)
            nil
          end
        end
      end

      # adds support for the following options:
      #
      # :circuit_breaker_max_attempts :: the number of attempts the circuit allows, before it is opened (defaults to <tt>3</tt>).
      # :circuit_breaker_reset_attempts_in :: the time a circuit stays open at most, before it resets (defaults to <tt>60</tt>).
      # :circuit_breaker_break_on :: callable defining an alternative rule for a response to break
      #                              (i.e. <tt>->(res) { res.status == 429 } </tt>)
      # :circuit_breaker_break_in :: the time that must elapse before an open circuit can transit to the half-open state
      #                              (defaults to <tt><60</tt>).
      # :circuit_breaker_half_open_drip_rate :: the rate of requests a circuit allows to be performed when in an half-open state
      #                                         (defaults to <tt>1</tt>).
      module OptionsMethods
        private

        def option_circuit_breaker_max_attempts(value)
          attempts = Integer(value)
          raise TypeError, ":circuit_breaker_max_attempts must be positive" unless attempts.positive?

          attempts
        end

        def option_circuit_breaker_reset_attempts_in(value)
          timeout = Float(value)
          raise TypeError, ":circuit_breaker_reset_attempts_in must be positive" unless timeout.positive?

          timeout
        end

        def option_circuit_breaker_break_in(value)
          timeout = Float(value)
          raise TypeError, ":circuit_breaker_break_in must be positive" unless timeout.positive?

          timeout
        end

        def option_circuit_breaker_half_open_drip_rate(value)
          ratio = Float(value)
          raise TypeError, ":circuit_breaker_half_open_drip_rate must be a number between 0 and 1" unless (0..1).cover?(ratio)

          ratio
        end

        def option_circuit_breaker_break_on(value)
          raise TypeError, ":circuit_breaker_break_on must be called with the response" unless value.respond_to?(:call)

          value
        end
      end
    end
    register_plugin :circuit_breaker, CircuitBreaker
  end
end