1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
import unittest
from collections import deque
from copy import copy
from test.support import threading_helper
threading_helper.requires_working_threading(module=True)
class TestDeque(unittest.TestCase):
def test_copy_race(self):
# gh-144809: Test that deque copy is thread safe. It previously
# could raise a "deque mutated during iteration" error.
d = deque(range(100))
def mutate():
for i in range(1000):
d.append(i)
if len(d) > 200:
d.popleft()
def copy_loop():
for _ in range(1000):
copy(d)
threading_helper.run_concurrently([mutate, copy_loop])
if __name__ == "__main__":
unittest.main()
|