File: adaptive.py

package info (click to toggle)
python-botocore 1.37.9%2Brepack-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 121,768 kB
  • sloc: python: 73,696; xml: 14,880; javascript: 181; makefile: 155
file content (132 lines) | stat: -rw-r--r-- 4,207 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import logging
import math
import threading

from botocore.retries import bucket, standard, throttling

logger = logging.getLogger(__name__)


def register_retry_handler(client):
    clock = bucket.Clock()
    rate_adjustor = throttling.CubicCalculator(
        starting_max_rate=0, start_time=clock.current_time()
    )
    token_bucket = bucket.TokenBucket(max_rate=1, clock=clock)
    rate_clocker = RateClocker(clock)
    throttling_detector = standard.ThrottlingErrorDetector(
        retry_event_adapter=standard.RetryEventAdapter(),
    )
    limiter = ClientRateLimiter(
        rate_adjustor=rate_adjustor,
        rate_clocker=rate_clocker,
        token_bucket=token_bucket,
        throttling_detector=throttling_detector,
        clock=clock,
    )
    client.meta.events.register(
        'before-send',
        limiter.on_sending_request,
    )
    client.meta.events.register(
        'needs-retry',
        limiter.on_receiving_response,
    )
    return limiter


class ClientRateLimiter:
    _MAX_RATE_ADJUST_SCALE = 2.0

    def __init__(
        self,
        rate_adjustor,
        rate_clocker,
        token_bucket,
        throttling_detector,
        clock,
    ):
        self._rate_adjustor = rate_adjustor
        self._rate_clocker = rate_clocker
        self._token_bucket = token_bucket
        self._throttling_detector = throttling_detector
        self._clock = clock
        self._enabled = False
        self._lock = threading.Lock()

    def on_sending_request(self, request, **kwargs):
        if self._enabled:
            self._token_bucket.acquire()

    # Hooked up to needs-retry.
    def on_receiving_response(self, **kwargs):
        measured_rate = self._rate_clocker.record()
        timestamp = self._clock.current_time()
        with self._lock:
            if not self._throttling_detector.is_throttling_error(**kwargs):
                new_rate = self._rate_adjustor.success_received(timestamp)
            else:
                if not self._enabled:
                    rate_to_use = measured_rate
                else:
                    rate_to_use = min(
                        measured_rate, self._token_bucket.max_rate
                    )
                new_rate = self._rate_adjustor.error_received(
                    rate_to_use, timestamp
                )
                logger.debug(
                    "Throttling response received, new send rate: %s "
                    "measured rate: %s, token bucket capacity "
                    "available: %s",
                    new_rate,
                    measured_rate,
                    self._token_bucket.available_capacity,
                )
                self._enabled = True
            self._token_bucket.max_rate = min(
                new_rate, self._MAX_RATE_ADJUST_SCALE * measured_rate
            )


class RateClocker:
    """Tracks the rate at which a client is sending a request."""

    _DEFAULT_SMOOTHING = 0.8
    # Update the rate every _TIME_BUCKET_RANGE seconds.
    _TIME_BUCKET_RANGE = 0.5

    def __init__(
        self,
        clock,
        smoothing=_DEFAULT_SMOOTHING,
        time_bucket_range=_TIME_BUCKET_RANGE,
    ):
        self._clock = clock
        self._measured_rate = 0
        self._smoothing = smoothing
        self._last_bucket = math.floor(self._clock.current_time())
        self._time_bucket_scale = 1 / self._TIME_BUCKET_RANGE
        self._count = 0
        self._lock = threading.Lock()

    def record(self, amount=1):
        with self._lock:
            t = self._clock.current_time()
            bucket = (
                math.floor(t * self._time_bucket_scale)
                / self._time_bucket_scale
            )
            self._count += amount
            if bucket > self._last_bucket:
                current_rate = self._count / float(bucket - self._last_bucket)
                self._measured_rate = (current_rate * self._smoothing) + (
                    self._measured_rate * (1 - self._smoothing)
                )
                self._count = 0
                self._last_bucket = bucket
            return self._measured_rate

    @property
    def measured_rate(self):
        return self._measured_rate