1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
|
import unittest
from threading import Thread
from unittest import TestCase
from test.support import threading_helper
NTHREAD = 10
OBJECT_COUNT = 5_000
class C:
def __init__(self, v):
self.v = v
@threading_helper.requires_working_threading()
class TestList(TestCase):
def test_racing_iter_append(self):
l = []
def writer_func():
for i in range(OBJECT_COUNT):
l.append(C(i + OBJECT_COUNT))
def reader_func():
while True:
count = len(l)
for i, x in enumerate(l):
self.assertEqual(x.v, i + OBJECT_COUNT)
if count == OBJECT_COUNT:
break
writer = Thread(target=writer_func)
readers = []
for x in range(NTHREAD):
reader = Thread(target=reader_func)
readers.append(reader)
reader.start()
writer.start()
writer.join()
for reader in readers:
reader.join()
def test_racing_iter_extend(self):
l = []
def writer_func():
for i in range(OBJECT_COUNT):
l.extend([C(i + OBJECT_COUNT)])
def reader_func():
while True:
count = len(l)
for i, x in enumerate(l):
self.assertEqual(x.v, i + OBJECT_COUNT)
if count == OBJECT_COUNT:
break
writer = Thread(target=writer_func)
readers = []
for x in range(NTHREAD):
reader = Thread(target=reader_func)
readers.append(reader)
reader.start()
writer.start()
writer.join()
for reader in readers:
reader.join()
if __name__ == "__main__":
unittest.main()
|