File: client_rate_limiter.rb

package info (click to toggle)
ruby-aws-sdk-core 3.104.3-3%2Bdeb11u2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,444 kB
  • sloc: ruby: 11,201; makefile: 4
file content (139 lines) | stat: -rw-r--r-- 4,585 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# frozen_string_literal: true

module Aws
  module Plugins
    module Retries
      # @api private
      # Used only in 'adaptive' retry mode
      class ClientRateLimiter
        MIN_CAPACITY = 1
        MIN_FILL_RATE = 0.5
        SMOOTH = 0.8
        # How much to scale back after a throttling response
        BETA = 0.7
        # Controls how aggressively we scale up after being throttled
        SCALE_CONSTANT = 0.4

        def initialize
          @mutex                = Mutex.new
          @fill_rate            = nil
          @max_capacity         = nil
          @current_capacity     = 0
          @last_timestamp       = nil
          @enabled              = false
          @measured_tx_rate     = 0
          @last_tx_rate_bucket  = Aws::Util.monotonic_seconds
          @request_count        = 0
          @last_max_rate        = 0
          @last_throttle_time   = Aws::Util.monotonic_seconds
          @calculated_rate      = nil
        end

        def token_bucket_acquire(amount, wait_to_fill = true)
          # Client side throttling is not enabled until we see a
          # throttling error
          return unless @enabled

          @mutex.synchronize do
            token_bucket_refill

            # Next see if we have enough capacity for the requested amount
            while @current_capacity < amount
              raise Aws::Errors::RetryCapacityNotAvailableError unless wait_to_fill
              @mutex.sleep((amount - @current_capacity) / @fill_rate)
              token_bucket_refill
            end
            @current_capacity -= amount
          end
        end

        def update_sending_rate(is_throttling_error)
          @mutex.synchronize do
            update_measured_rate

            if is_throttling_error
              rate_to_use = if @enabled
                              [@measured_tx_rate, @fill_rate].min
                            else
                              @measured_tx_rate
                            end

              # The fill_rate is from the token bucket
              @last_max_rate = rate_to_use
              calculate_time_window
              @last_throttle_time = Aws::Util.monotonic_seconds
              @calculated_rate = cubic_throttle(rate_to_use)
              enable_token_bucket
            else
              calculate_time_window
              @calculated_rate = cubic_success(Aws::Util.monotonic_seconds)
            end

            new_rate = [@calculated_rate, 2 * @measured_tx_rate].min
            token_bucket_update_rate(new_rate)
          end
        end

        private

        def token_bucket_refill
          timestamp = Aws::Util.monotonic_seconds
          unless @last_timestamp
            @last_timestamp = timestamp
            return
          end

          fill_amount = (timestamp - @last_timestamp) * @fill_rate
          @current_capacity = [
            @max_capacity, @current_capacity + fill_amount
          ].min

          @last_timestamp = timestamp
        end

        def token_bucket_update_rate(new_rps)
          # Refill based on our current rate before we update to the
          # new fill rate
          token_bucket_refill
          @fill_rate = [new_rps, MIN_FILL_RATE].max
          @max_capacity = [new_rps, MIN_CAPACITY].max
          # When we scale down we can't have a current capacity that exceeds our
          # max_capacity.
          @current_capacity = [@current_capacity, @max_capacity].min
        end

        def enable_token_bucket
          @enabled = true
        end

        def update_measured_rate
          t = Aws::Util.monotonic_seconds
          time_bucket = (t * 2).floor / 2.0
          @request_count += 1
          if time_bucket > @last_tx_rate_bucket
            current_rate = @request_count / (time_bucket - @last_tx_rate_bucket)
            @measured_tx_rate = (current_rate * SMOOTH) +
              (@measured_tx_rate * (1 - SMOOTH))
            @request_count = 0
            @last_tx_rate_bucket = time_bucket
          end
        end

        def calculate_time_window
          # This is broken out into a separate calculation because it only
          # gets updated when @last_max_rate changes so it can be cached.
          @time_window = ((@last_max_rate * (1 - BETA)) / SCALE_CONSTANT)**(1.0 / 3)
        end

        def cubic_success(timestamp)
          dt = timestamp - @last_throttle_time
          (SCALE_CONSTANT * ((dt - @time_window)**3)) + @last_max_rate
        end

        def cubic_throttle(rate_to_use)
          rate_to_use * BETA
        end
      end
    end
  end
end