File: test_bucket.py

package info (click to toggle)
python-botocore 1.40.72%2Brepack-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 128,088 kB
  • sloc: python: 76,667; xml: 14,037; javascript: 181; makefile: 157
file content (121 lines) | stat: -rw-r--r-- 4,602 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from botocore.exceptions import CapacityNotAvailableError
from botocore.retries import bucket
from tests import unittest


class FakeClock(bucket.Clock):
    def __init__(self, timestamp_sequences):
        self.timestamp_sequences = timestamp_sequences
        self.sleep_call_amounts = []

    def sleep(self, amount):
        self.sleep_call_amounts.append(amount)

    def current_time(self):
        return self.timestamp_sequences.pop(0)


class TestTokenBucket(unittest.TestCase):
    def setUp(self):
        self.timestamp_sequences = [0]
        self.clock = FakeClock(self.timestamp_sequences)

    def create_token_bucket(self, max_rate=10, min_rate=0.1):
        return bucket.TokenBucket(
            max_rate=max_rate, clock=self.clock, min_rate=min_rate
        )

    def test_can_acquire_amount(self):
        self.timestamp_sequences.extend(
            [
                # Requests tokens every second, which is well below our
                # 10 TPS fill rate.
                1,
                2,
                3,
                4,
                5,
            ]
        )
        token_bucket = self.create_token_bucket(max_rate=10)
        for _ in range(5):
            self.assertTrue(token_bucket.acquire(1, block=False))

    def test_can_change_max_capacity_lower(self):
        # Requests at 1 TPS.
        self.timestamp_sequences.extend([1, 2, 3, 4, 5])
        token_bucket = self.create_token_bucket(max_rate=10)
        # Request the first 5 tokens with max_rate=10
        for _ in range(5):
            self.assertTrue(token_bucket.acquire(1, block=False))
        # Now scale the max_rate down to 1 on the 5th second.
        self.timestamp_sequences.append(5)
        token_bucket.max_rate = 1
        # And then from seconds 6-10 we request at one per second.
        self.timestamp_sequences.extend([6, 7, 8, 9, 10])
        for _ in range(5):
            self.assertTrue(token_bucket.acquire(1, block=False))

    def test_max_capacity_is_at_least_one(self):
        token_bucket = self.create_token_bucket()
        self.timestamp_sequences.append(1)
        token_bucket.max_rate = 0.5
        self.assertEqual(token_bucket.max_rate, 0.5)
        self.assertEqual(token_bucket.max_capacity, 1)

    def test_acquire_fails_on_non_block_mode_returns_false(self):
        self.timestamp_sequences.extend(
            [
                # Initial creation time.
                0,
                # Requests a token 1 second later.
                1,
            ]
        )
        token_bucket = self.create_token_bucket(max_rate=10)
        with self.assertRaises(CapacityNotAvailableError):
            token_bucket.acquire(100, block=False)

    def test_can_retrieve_at_max_send_rate(self):
        self.timestamp_sequences.extend(
            [
                # Request a new token every 100ms (10 TPS) for 2 seconds.
                1 + 0.1 * i
                for i in range(20)
            ]
        )
        token_bucket = self.create_token_bucket(max_rate=10)
        for _ in range(20):
            self.assertTrue(token_bucket.acquire(1, block=False))

    def test_acquiring_blocks_when_capacity_reached(self):
        # This is 1 token every 0.1 seconds.
        token_bucket = self.create_token_bucket(max_rate=10)
        self.timestamp_sequences.extend(
            [
                # The first acquire() happens after .1 seconds.
                0.1,
                # The second acquire() will fail because we get tokens at
                # 1 per 0.1 seconds.  We will then sleep for 0.05 seconds until we
                # get a new token.
                0.15,
                # And at 0.2 seconds we get our token.
                0.2,
                # And at 0.3 seconds we have no issues getting a token.
                # Because we're using such small units (to avoid bloating the
                # test run time), we have to go slightly over 0.3 seconds here.
                0.300001,
            ]
        )
        self.assertTrue(token_bucket.acquire(1, block=False))
        self.assertEqual(token_bucket.available_capacity, 0)
        self.assertTrue(token_bucket.acquire(1, block=True))
        self.assertEqual(token_bucket.available_capacity, 0)
        self.assertTrue(token_bucket.acquire(1, block=False))

    def test_rate_cant_go_below_min(self):
        token_bucket = self.create_token_bucket(max_rate=1, min_rate=0.2)
        self.timestamp_sequences.append(1)
        token_bucket.max_rate = 0.1
        self.assertEqual(token_bucket.max_rate, 0.2)
        self.assertEqual(token_bucket.max_capacity, 1)