1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
# frozen_string_literal: true
module Aws
module Plugins
module Retries
# @api private
# Used only in 'adaptive' retry mode
class ClientRateLimiter
MIN_CAPACITY = 1
MIN_FILL_RATE = 0.5
SMOOTH = 0.8
# How much to scale back after a throttling response
BETA = 0.7
# Controls how aggressively we scale up after being throttled
SCALE_CONSTANT = 0.4
def initialize
@mutex = Mutex.new
@fill_rate = nil
@max_capacity = nil
@current_capacity = 0
@last_timestamp = nil
@enabled = false
@measured_tx_rate = 0
@last_tx_rate_bucket = Aws::Util.monotonic_seconds
@request_count = 0
@last_max_rate = 0
@last_throttle_time = Aws::Util.monotonic_seconds
@calculated_rate = nil
end
def token_bucket_acquire(amount, wait_to_fill = true)
# Client side throttling is not enabled until we see a
# throttling error
return unless @enabled
@mutex.synchronize do
token_bucket_refill
# Next see if we have enough capacity for the requested amount
while @current_capacity < amount
raise Aws::Errors::RetryCapacityNotAvailableError unless wait_to_fill
@mutex.sleep((amount - @current_capacity) / @fill_rate)
token_bucket_refill
end
@current_capacity -= amount
end
end
def update_sending_rate(is_throttling_error)
@mutex.synchronize do
update_measured_rate
if is_throttling_error
rate_to_use = if @enabled
[@measured_tx_rate, @fill_rate].min
else
@measured_tx_rate
end
# The fill_rate is from the token bucket
@last_max_rate = rate_to_use
calculate_time_window
@last_throttle_time = Aws::Util.monotonic_seconds
@calculated_rate = cubic_throttle(rate_to_use)
enable_token_bucket
else
calculate_time_window
@calculated_rate = cubic_success(Aws::Util.monotonic_seconds)
end
new_rate = [@calculated_rate, 2 * @measured_tx_rate].min
token_bucket_update_rate(new_rate)
end
end
private
def token_bucket_refill
timestamp = Aws::Util.monotonic_seconds
unless @last_timestamp
@last_timestamp = timestamp
return
end
fill_amount = (timestamp - @last_timestamp) * @fill_rate
@current_capacity = [
@max_capacity, @current_capacity + fill_amount
].min
@last_timestamp = timestamp
end
def token_bucket_update_rate(new_rps)
# Refill based on our current rate before we update to the
# new fill rate
token_bucket_refill
@fill_rate = [new_rps, MIN_FILL_RATE].max
@max_capacity = [new_rps, MIN_CAPACITY].max
# When we scale down we can't have a current capacity that exceeds our
# max_capacity.
@current_capacity = [@current_capacity, @max_capacity].min
end
def enable_token_bucket
@enabled = true
end
def update_measured_rate
t = Aws::Util.monotonic_seconds
time_bucket = (t * 2).floor / 2.0
@request_count += 1
if time_bucket > @last_tx_rate_bucket
current_rate = @request_count / (time_bucket - @last_tx_rate_bucket)
@measured_tx_rate = (current_rate * SMOOTH) +
(@measured_tx_rate * (1 - SMOOTH))
@request_count = 0
@last_tx_rate_bucket = time_bucket
end
end
def calculate_time_window
# This is broken out into a separate calculation because it only
# gets updated when @last_max_rate changes so it can be cached.
@time_window = ((@last_max_rate * (1 - BETA)) / SCALE_CONSTANT)**(1.0 / 3)
end
def cubic_success(timestamp)
dt = timestamp - @last_throttle_time
(SCALE_CONSTANT * ((dt - @time_window)**3)) + @last_max_rate
end
def cubic_throttle(rate_to_use)
rate_to_use * BETA
end
end
end
end
end
|