File: test_list.py

package info (click to toggle)
python3.14 3.14.3-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 164,024 kB
  • sloc: python: 758,439; ansic: 718,653; xml: 31,250; sh: 5,983; cpp: 4,093; makefile: 2,007; objc: 787; lisp: 502; javascript: 136; asm: 75; csh: 12
file content (117 lines) | stat: -rw-r--r-- 3,027 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import unittest

from threading import Thread, Barrier
from unittest import TestCase

from test.support import threading_helper


NTHREAD = 10
OBJECT_COUNT = 5_000


class C:
    def __init__(self, v):
        self.v = v


@threading_helper.requires_working_threading()
class TestList(TestCase):
    def test_racing_iter_append(self):
        l = []

        barrier = Barrier(NTHREAD + 1)
        def writer_func(l):
            barrier.wait()
            for i in range(OBJECT_COUNT):
                l.append(C(i + OBJECT_COUNT))

        def reader_func(l):
            barrier.wait()
            while True:
                count = len(l)
                for i, x in enumerate(l):
                    self.assertEqual(x.v, i + OBJECT_COUNT)
                if count == OBJECT_COUNT:
                    break

        writer = Thread(target=writer_func, args=(l,))
        readers = []
        for x in range(NTHREAD):
            reader = Thread(target=reader_func, args=(l,))
            readers.append(reader)
            reader.start()

        writer.start()
        writer.join()
        for reader in readers:
            reader.join()

    def test_racing_iter_extend(self):
        l = []

        barrier = Barrier(NTHREAD + 1)
        def writer_func():
            barrier.wait()
            for i in range(OBJECT_COUNT):
                l.extend([C(i + OBJECT_COUNT)])

        def reader_func():
            barrier.wait()
            while True:
                count = len(l)
                for i, x in enumerate(l):
                    self.assertEqual(x.v, i + OBJECT_COUNT)
                if count == OBJECT_COUNT:
                    break

        writer = Thread(target=writer_func)
        readers = []
        for x in range(NTHREAD):
            reader = Thread(target=reader_func)
            readers.append(reader)
            reader.start()

        writer.start()
        writer.join()
        for reader in readers:
            reader.join()

    def test_store_list_int(self):
        def copy_back_and_forth(b, l):
            b.wait()
            for _ in range(100):
                l[0] = l[1]
                l[1] = l[0]

        l = [0, 1]
        barrier = Barrier(NTHREAD)
        threads = [Thread(target=copy_back_and_forth, args=(barrier, l))
                   for _ in range(NTHREAD)]
        with threading_helper.start_threads(threads):
            pass

    # gh-145036: race condition with list.__sizeof__()
    def test_list_sizeof_free_threaded_build(self):
        L = []

        def mutate_function():
            for _ in range(100):
                L.append(1)
                L.pop()

        def size_function():
            for _ in range(100):
                L.__sizeof__()

        threads = []
        for _ in range(4):
            threads.append(Thread(target=mutate_function))
            threads.append(Thread(target=size_function))

        with threading_helper.start_threads(threads):
            pass


if __name__ == "__main__":
    unittest.main()