1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
|
import random
import unittest
from functools import lru_cache
from threading import Barrier, Thread
from test.support import threading_helper
@threading_helper.requires_working_threading()
class TestLRUCache(unittest.TestCase):
def _test_concurrent_operations(self, maxsize):
num_threads = 10
b = Barrier(num_threads)
@lru_cache(maxsize=maxsize)
def func(arg=0):
return object()
def thread_func():
b.wait()
for i in range(1000):
r = random.randint(0, 1000)
if i < 800:
func(i)
elif i < 900:
func.cache_info()
else:
func.cache_clear()
threads = []
for i in range(num_threads):
t = Thread(target=thread_func)
threads.append(t)
with threading_helper.start_threads(threads):
pass
def test_concurrent_operations_unbounded(self):
self._test_concurrent_operations(maxsize=None)
def test_concurrent_operations_bounded(self):
self._test_concurrent_operations(maxsize=128)
def _test_reentrant_cache_clear(self, maxsize):
num_threads = 10
b = Barrier(num_threads)
@lru_cache(maxsize=maxsize)
def func(arg=0):
func.cache_clear()
return object()
def thread_func():
b.wait()
for i in range(1000):
func(random.randint(0, 10000))
threads = []
for i in range(num_threads):
t = Thread(target=thread_func)
threads.append(t)
with threading_helper.start_threads(threads):
pass
def test_reentrant_cache_clear_unbounded(self):
self._test_reentrant_cache_clear(maxsize=None)
def test_reentrant_cache_clear_bounded(self):
self._test_reentrant_cache_clear(maxsize=128)
if __name__ == "__main__":
unittest.main()
|