1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
# Copyright 2013 Arnaud Porterie
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import random
import time
import unittest
import threading
from ratelimiter import RateLimiter
class Timer(object):
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.stop = time.time()
self.duration = self.stop - self.start
class TestBasic(unittest.TestCase):
period = 0.01
max_calls = 10
def setUp(self):
random.seed()
def validate_call_times(self, ts, max_calls, period):
# Overall verification: total call duration should span over more than
# the corresponding number of periods.
timespan = math.ceil((ts[-1] - ts[0]) / period)
self.assertGreaterEqual(max_calls, len(ts) / timespan)
# Sliding verification: no group of 'max_calls' items should span over
# less than a period.
for i in range(len(ts) - max_calls):
self.assertGreaterEqual(ts[i + max_calls] - ts[i], period)
def test_bad_args(self):
self.assertRaises(ValueError, RateLimiter, -1, self.period)
self.assertRaises(ValueError, RateLimiter, +1, -self.period)
def test_limit_1(self):
with Timer() as timer:
obj = RateLimiter(self.max_calls, self.period)
for i in range(self.max_calls + 1):
with obj:
# After the 'self.max_calls' iteration the execution
# inside the context manager should be blocked
# for the 'self.period' seconds.
pass
# The sum of the time in the iterations without the rate limit blocking
# is way lower than 'self.period'. If the duration of the all
# iterations is greater or equal to the 'self.period' then blocking
# and sleeping after the 'self.max_calls' iteration has been occured.
self.assertGreaterEqual(timer.duration, self.period)
def test_limit_2(self):
calls = []
obj = RateLimiter(self.max_calls, self.period)
for i in range(3 * self.max_calls):
with obj:
calls.append(time.time())
self.assertEqual(len(calls), 3 * self.max_calls)
self.validate_call_times(calls, self.max_calls, self.period)
def test_decorator_1(self):
@RateLimiter(self.max_calls, self.period)
def f():
# After the 'self.max_calls' iteration the execution
# of the function should be blocked for the 'self.period' seconds.
pass
with Timer() as timer:
[f() for i in range(self.max_calls + 1)]
# The sum of the time in the iterations without the rate limit blocking
# is way lower than 'self.period'. If the duration of the all
# iterations is greater or equal to the 'self.period' then blocking
# and sleeping after the 'self.max_calls' iteration has been occured.
self.assertGreaterEqual(timer.duration, self.period)
def test_decorator_2(self):
@RateLimiter(self.max_calls, self.period)
def f():
f.calls.append(time.time())
f.calls = []
[f() for i in range(3 * self.max_calls)]
self.assertEqual(len(f.calls), 3 * self.max_calls)
self.validate_call_times(f.calls, self.max_calls, self.period)
def test_random(self):
for _ in range(10):
calls = []
obj = RateLimiter(self.max_calls, self.period)
for i in range(random.randint(10, 50)):
with obj:
calls.append(time.time())
self.validate_call_times(calls, self.max_calls, self.period)
def test_threading(self):
@RateLimiter(self.max_calls, self.period)
def f():
# After the 'self.max_calls' iteration the execution
# of the function should be blocked for the 'self.period' seconds.
pass
with Timer() as timer:
threads = []
for i in range(self.max_calls + 1):
# Running each target in it's own thread should not affect
# the rate limiting
t = threading.Thread(target=f)
threads.append(t)
t.start()
[t.join() for t in threads]
# The sum of the time in the iterations without the rate limit blocking
# is way lower than 'self.period'. If the duration of the all
# iterations is greater or equal to the 'self.period' then blocking
# and sleeping after the 'self.max_calls' iteration has been occured.
self.assertGreaterEqual(timer.duration, self.period)
if __name__ == "__main__":
unittest.main()
|