1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
# frozen_string_literal: true
module HTTPX
module Plugins
#
# This plugin implements a circuit breaker around connection errors.
#
# https://gitlab.com/os85/httpx/wikis/Circuit-Breaker
#
module CircuitBreaker
using URIExtensions
def self.load_dependencies(*)
require_relative "circuit_breaker/circuit"
require_relative "circuit_breaker/circuit_store"
end
def self.extra_options(options)
options.merge(
circuit_breaker_max_attempts: 3,
circuit_breaker_reset_attempts_in: 60,
circuit_breaker_break_in: 60,
circuit_breaker_half_open_drip_rate: 1
)
end
module InstanceMethods
include HTTPX::Callbacks
def initialize(*)
super
@circuit_store = CircuitStore.new(@options)
end
%i[circuit_open].each do |meth|
class_eval(<<-MOD, __FILE__, __LINE__ + 1)
def on_#{meth}(&blk) # def on_circuit_open(&blk)
on(:#{meth}, &blk) # on(:circuit_open, &blk)
self # self
end # end
MOD
end
private
def send_requests(*requests)
# @type var short_circuit_responses: Array[response]
short_circuit_responses = []
# run all requests through the circuit breaker, see if the circuit is
# open for any of them.
real_requests = requests.each_with_index.with_object([]) do |(req, idx), real_reqs|
short_circuit_response = @circuit_store.try_respond(req)
if short_circuit_response.nil?
real_reqs << req
next
end
short_circuit_responses[idx] = short_circuit_response
end
# run requests for the remainder
unless real_requests.empty?
responses = super(*real_requests)
real_requests.each_with_index do |request, idx|
short_circuit_responses[requests.index(request)] = responses[idx]
end
end
short_circuit_responses
end
def set_request_callbacks(request)
super
request.on(:response) do |response|
emit(:circuit_open, request) if try_circuit_open(request, response)
end
end
def try_circuit_open(request, response)
if response.is_a?(ErrorResponse)
case response.error
when RequestTimeoutError
@circuit_store.try_open(request.uri, response)
else
@circuit_store.try_open(request.origin, response)
end
elsif (break_on = request.options.circuit_breaker_break_on) && break_on.call(response)
@circuit_store.try_open(request.uri, response)
else
@circuit_store.try_close(request.uri)
nil
end
end
end
# adds support for the following options:
#
# :circuit_breaker_max_attempts :: the number of attempts the circuit allows, before it is opened (defaults to <tt>3</tt>).
# :circuit_breaker_reset_attempts_in :: the time a circuit stays open at most, before it resets (defaults to <tt>60</tt>).
# :circuit_breaker_break_on :: callable defining an alternative rule for a response to break
# (i.e. <tt>->(res) { res.status == 429 } </tt>)
# :circuit_breaker_break_in :: the time that must elapse before an open circuit can transit to the half-open state
# (defaults to <tt><60</tt>).
# :circuit_breaker_half_open_drip_rate :: the rate of requests a circuit allows to be performed when in an half-open state
# (defaults to <tt>1</tt>).
module OptionsMethods
private
def option_circuit_breaker_max_attempts(value)
attempts = Integer(value)
raise TypeError, ":circuit_breaker_max_attempts must be positive" unless attempts.positive?
attempts
end
def option_circuit_breaker_reset_attempts_in(value)
timeout = Float(value)
raise TypeError, ":circuit_breaker_reset_attempts_in must be positive" unless timeout.positive?
timeout
end
def option_circuit_breaker_break_in(value)
timeout = Float(value)
raise TypeError, ":circuit_breaker_break_in must be positive" unless timeout.positive?
timeout
end
def option_circuit_breaker_half_open_drip_rate(value)
ratio = Float(value)
raise TypeError, ":circuit_breaker_half_open_drip_rate must be a number between 0 and 1" unless (0..1).cover?(ratio)
ratio
end
def option_circuit_breaker_break_on(value)
raise TypeError, ":circuit_breaker_break_on must be called with the response" unless value.respond_to?(:call)
value
end
end
end
register_plugin :circuit_breaker, CircuitBreaker
end
end
|