File: test_adaptive.py

package info (click to toggle)
python-aiobotocore 2.25.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,524 kB
  • sloc: python: 15,437; makefile: 84
file content (199 lines) | stat: -rw-r--r-- 7,898 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# The following tests are the adaptation of the unit tests for the original (sync)
# ClientRateLimiter and TokenBucket in botocore.
# see: https://github.com/boto/botocore:
# `/tests/unit/retries/test_bucket.py` and `/tests/unit/retries/test_adaptive.py`.

from unittest import mock

import pytest
from botocore.exceptions import CapacityNotAvailableError
from botocore.retries import standard, throttling

from aiobotocore.retries import adaptive, bucket


class _SleepMethodCalled(Exception):
    """Raised to explicitly fail a test for calling the blocking `sleep` method."""

    pass


class FakeClock(bucket.Clock):
    def __init__(self, timestamp_sequences):
        self.timestamp_sequences = timestamp_sequences
        self.sleep_call_amounts = []

    def sleep(self, amount):
        raise _SleepMethodCalled(
            "sleep method should never be called, non-blocking behavior expected"
        )

    def current_time(self):
        return self.timestamp_sequences.pop(0)


class TestAsyncClientRateLimiter:
    @pytest.fixture(autouse=True)
    def _setup(self):
        self.timestamp_sequences = [0]
        self.clock = FakeClock(self.timestamp_sequences)
        self.token_bucket = mock.Mock(spec=bucket.AsyncTokenBucket)
        self.rate_adjustor = mock.Mock(spec=throttling.CubicCalculator)
        self.rate_clocker = mock.Mock(spec=adaptive.RateClocker)
        self.throttling_detector = mock.Mock(
            spec=standard.ThrottlingErrorDetector
        )

    def create_client_limiter(self):
        rate_limiter = adaptive.AsyncClientRateLimiter(
            rate_adjustor=self.rate_adjustor,
            rate_clocker=self.rate_clocker,
            token_bucket=self.token_bucket,
            throttling_detector=self.throttling_detector,
            clock=self.clock,
        )
        return rate_limiter

    async def test_bucket_bucket_acquisition_only_if_enabled(self):
        rate_limiter = self.create_client_limiter()
        await rate_limiter.on_sending_request(request=mock.sentinel.request)
        assert not self.token_bucket.acquire.called

    async def test_token_bucket_enabled_on_throttling_error(self):
        rate_limiter = self.create_client_limiter()
        self.throttling_detector.is_throttling_error.return_value = True
        self.rate_clocker.record.return_value = 21
        self.rate_adjustor.error_received.return_value = 17
        await rate_limiter.on_receiving_response()
        # Now if we call on_receiving_response we should try to acquire
        # token.
        self.timestamp_sequences.append(1)
        await rate_limiter.on_sending_request(request=mock.sentinel.request)
        assert self.token_bucket.acquire.called

    async def test_max_rate_updated_on_success_response(self):
        rate_limiter = self.create_client_limiter()
        self.throttling_detector.is_throttling_error.return_value = False
        self.rate_adjustor.success_received.return_value = 20
        self.rate_clocker.record.return_value = 21
        await rate_limiter.on_receiving_response()
        self.token_bucket.set_max_rate.assert_called_with(20)

    async def test_max_rate_cant_exceed_20_percent_max(self):
        rate_limiter = self.create_client_limiter()
        self.throttling_detector.is_throttling_error.return_value = False
        # So if our actual measured sending rate is 20 TPS
        self.rate_clocker.record.return_value = 20
        # But the rate adjustor is telling us to go up to 100 TPS
        self.rate_adjustor.success_received.return_value = 100

        # The most we should go up is 2.0 * 20
        await rate_limiter.on_receiving_response()
        self.token_bucket.set_max_rate.assert_called_with(2.0 * 20)


class TestAsyncTokenBucket:
    @pytest.fixture(autouse=True)
    def _setup(self):
        self.timestamp_sequences = [0]
        self.clock = FakeClock(self.timestamp_sequences)

    def create_token_bucket(self, max_rate=10, min_rate=0.1):
        return bucket.AsyncTokenBucket(
            max_rate=max_rate, clock=self.clock, min_rate=min_rate
        )

    async def test_can_acquire_amount(self):
        self.timestamp_sequences.extend(
            [
                # Requests tokens every second, which is well below our
                # 10 TPS fill rate.
                1,
                2,
                3,
                4,
                5,
            ]
        )
        token_bucket = self.create_token_bucket(max_rate=10)
        for _ in range(5):
            assert await token_bucket.acquire(1, block=False)

    async def test_can_change_max_capacity_lower(self):
        # Requests at 1 TPS.
        self.timestamp_sequences.extend([1, 2, 3, 4, 5])
        token_bucket = self.create_token_bucket(max_rate=10)
        # Request the first 5 tokens with max_rate=10
        for _ in range(5):
            assert await token_bucket.acquire(1, block=False)
        # Now scale the max_rate down to 1 on the 5th second.
        self.timestamp_sequences.append(5)
        await token_bucket.set_max_rate(1)
        # And then from seconds 6-10 we request at one per second.
        self.timestamp_sequences.extend([6, 7, 8, 9, 10])
        for _ in range(5):
            assert await token_bucket.acquire(1, block=False)

    async def test_max_capacity_is_at_least_one(self):
        token_bucket = self.create_token_bucket()
        self.timestamp_sequences.append(1)
        await token_bucket.set_max_rate(0.5)
        assert token_bucket._fill_rate == 0.5
        assert token_bucket._max_capacity == 1

    async def test_acquire_fails_on_non_block_mode_returns_false(self):
        self.timestamp_sequences.extend(
            [
                # Initial creation time.
                0,
                # Requests a token 1 second later.
                1,
            ]
        )
        token_bucket = self.create_token_bucket(max_rate=10)
        with pytest.raises(CapacityNotAvailableError):
            await token_bucket.acquire(100, block=False)

    async def test_can_retrieve_at_max_send_rate(self):
        self.timestamp_sequences.extend(
            [
                # Request a new token every 100ms (10 TPS) for 2 seconds.
                1 + 0.1 * i
                for i in range(20)
            ]
        )
        token_bucket = self.create_token_bucket(max_rate=10)
        for _ in range(20):
            assert await token_bucket.acquire(1, block=False)

    async def test_acquiring_blocks_when_capacity_reached(self):
        # This is 1 token every 0.1 seconds.
        token_bucket = self.create_token_bucket(max_rate=10)
        self.timestamp_sequences.extend(
            [
                # The first acquire() happens after .1 seconds.
                0.1,
                # The second acquire() will fail because we get tokens at
                # 1 per 0.1 seconds.  We will then sleep for 0.05 seconds until we
                # get a new token.
                0.15,
                # And at 0.2 seconds we get our token.
                0.2,
                # And at 0.3 seconds we have no issues getting a token.
                # Because we're using such small units (to avoid bloating the
                # test run time), we have to go slightly over 0.3 seconds here.
                0.300001,
            ]
        )
        assert await token_bucket.acquire(1, block=False)
        assert token_bucket._current_capacity == 0
        assert await token_bucket.acquire(1, block=True)
        assert token_bucket._current_capacity == 0
        assert await token_bucket.acquire(1, block=False)

    async def test_rate_cant_go_below_min(self):
        token_bucket = self.create_token_bucket(max_rate=1, min_rate=0.2)
        self.timestamp_sequences.append(1)
        await token_bucket.set_max_rate(0.1)
        assert token_bucket._fill_rate == 0.2
        assert token_bucket._current_capacity == 1