File: adaptive.py

package info (click to toggle)
python-botocore 1.20.0%2Brepack-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 60,608 kB
  • sloc: python: 50,632; xml: 15,052; makefile: 131
file content (117 lines) | stat: -rw-r--r-- 4,191 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import math
import logging
import threading

from botocore.retries import bucket
from botocore.retries import throttling
from botocore.retries import standard


logger = logging.getLogger(__name__)


def register_retry_handler(client):
    clock = bucket.Clock()
    rate_adjustor = throttling.CubicCalculator(starting_max_rate=0,
                                               start_time=clock.current_time())
    token_bucket = bucket.TokenBucket(max_rate=1, clock=clock)
    rate_clocker = RateClocker(clock)
    throttling_detector = standard.ThrottlingErrorDetector(
        retry_event_adapter=standard.RetryEventAdapter(),
    )
    limiter = ClientRateLimiter(
        rate_adjustor=rate_adjustor,
        rate_clocker=rate_clocker,
        token_bucket=token_bucket,
        throttling_detector=throttling_detector,
        clock=clock,
    )
    client.meta.events.register(
        'before-send', limiter.on_sending_request,
    )
    client.meta.events.register(
        'needs-retry', limiter.on_receiving_response,
    )
    return limiter


class ClientRateLimiter(object):

    _MAX_RATE_ADJUST_SCALE = 2.0

    def __init__(self, rate_adjustor, rate_clocker, token_bucket,
                 throttling_detector, clock):
        self._rate_adjustor = rate_adjustor
        self._rate_clocker = rate_clocker
        self._token_bucket = token_bucket
        self._throttling_detector = throttling_detector
        self._clock = clock
        self._enabled = False
        self._lock = threading.Lock()

    def on_sending_request(self, request, **kwargs):
        if self._enabled:
            self._token_bucket.acquire()

    # Hooked up to needs-retry.
    def on_receiving_response(self, **kwargs):
        measured_rate = self._rate_clocker.record()
        timestamp = self._clock.current_time()
        with self._lock:
            if not self._throttling_detector.is_throttling_error(**kwargs):
                throttling = False
                new_rate = self._rate_adjustor.success_received(timestamp)
            else:
                throttling = True
                if not self._enabled:
                    rate_to_use = measured_rate
                else:
                    rate_to_use = min(measured_rate, self._token_bucket.max_rate)
                new_rate = self._rate_adjustor.error_received(
                    rate_to_use, timestamp)
                logger.debug("Throttling response received, new send rate: %s "
                             "measured rate: %s, token bucket capacity "
                             "available: %s", new_rate, measured_rate,
                             self._token_bucket.available_capacity)
                self._enabled = True
            self._token_bucket.max_rate = min(
                new_rate, self._MAX_RATE_ADJUST_SCALE * measured_rate)


class RateClocker(object):
    """Tracks the rate at which a client is sending a request."""

    _DEFAULT_SMOOTHING = 0.8
    # Update the rate every _TIME_BUCKET_RANGE seconds.
    _TIME_BUCKET_RANGE = 0.5

    def __init__(self, clock, smoothing=_DEFAULT_SMOOTHING,
                 time_bucket_range=_TIME_BUCKET_RANGE):
        self._clock = clock
        self._measured_rate = 0
        self._smoothing = smoothing
        self._last_bucket = math.floor(self._clock.current_time())
        self._time_bucket_scale = 1 / self._TIME_BUCKET_RANGE
        self._count = 0
        self._lock = threading.Lock()

    def record(self, amount=1):
        with self._lock:
            t = self._clock.current_time()
            bucket = math.floor(
                t * self._time_bucket_scale) / self._time_bucket_scale
            self._count += amount
            if bucket > self._last_bucket:
                current_rate = self._count / float(
                    bucket - self._last_bucket)
                self._measured_rate = (
                    (current_rate * self._smoothing) +
                    (self._measured_rate * (1 - self._smoothing))
                )
                self._count = 0
                self._last_bucket = bucket
            return self._measured_rate

    @property
    def measured_rate(self):
        return self._measured_rate