1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
import unittest
from test.support import import_helper, threading_helper
import random
from threading import Thread, Barrier
py_bisect = import_helper.import_fresh_module('bisect', blocked=['_bisect'])
c_bisect = import_helper.import_fresh_module('bisect', fresh=['_bisect'])
NTHREADS = 4
OBJECT_COUNT = 500
class TestBase:
def do_racing_insort(self, insert_method):
def insert(data):
for _ in range(OBJECT_COUNT):
x = random.randint(-OBJECT_COUNT, OBJECT_COUNT)
insert_method(data, x)
data = list(range(OBJECT_COUNT))
self.run_concurrently(
worker_func=insert, args=(data,), nthreads=NTHREADS
)
if False:
# These functions are not thread-safe and so the list can become
# unsorted. However, we don't want Python to crash if these
# functions are used concurrently on the same sequence. This
# should also not produce any TSAN warnings.
self.assertTrue(self.is_sorted_ascending(data))
def test_racing_insert_right(self):
self.do_racing_insort(self.mod.insort_right)
def test_racing_insert_left(self):
self.do_racing_insort(self.mod.insort_left)
@staticmethod
def is_sorted_ascending(lst):
"""
Check if the list is sorted in ascending order (non-decreasing).
"""
return all(lst[i - 1] <= lst[i] for i in range(1, len(lst)))
def run_concurrently(self, worker_func, args, nthreads):
"""
Run the worker function concurrently in multiple threads.
"""
barrier = Barrier(nthreads)
def wrapper_func(*args):
# Wait for all threads to reach this point before proceeding.
barrier.wait()
worker_func(*args)
with threading_helper.catch_threading_exception() as cm:
workers = (
Thread(target=wrapper_func, args=args)
for _ in range(nthreads)
)
with threading_helper.start_threads(workers):
pass
# Worker threads should not raise any exceptions
self.assertIsNone(cm.exc_value)
@threading_helper.requires_working_threading()
class TestPyBisect(unittest.TestCase, TestBase):
mod = py_bisect
@threading_helper.requires_working_threading()
class TestCBisect(unittest.TestCase, TestBase):
mod = c_bisect
if __name__ == "__main__":
unittest.main()
|