1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
import codecs
import io
import threading
from unittest import TestCase
from test.support import threading_helper
from test.support.threading_helper import run_concurrently
from random import randint
from io import BytesIO
from sys import getsizeof
threading_helper.requires_working_threading(module=True)
class TestBytesIO(TestCase):
# Test pretty much everything that can break under free-threading.
# Non-deterministic, but at least one of these things will fail if
# BytesIO object is not free-thread safe.
def check(self, funcs, *args):
barrier = threading.Barrier(len(funcs))
threads = []
for func in funcs:
thread = threading.Thread(target=func, args=(barrier, *args))
threads.append(thread)
with threading_helper.start_threads(threads):
pass
@threading_helper.requires_working_threading()
@threading_helper.reap_threads
def test_free_threading(self):
"""Test for segfaults and aborts."""
def write(barrier, b, *ignore):
barrier.wait()
try: b.write(b'0' * randint(100, 1000))
except ValueError: pass # ignore write fail to closed file
def writelines(barrier, b, *ignore):
barrier.wait()
b.write(b'0\n' * randint(100, 1000))
def truncate(barrier, b, *ignore):
barrier.wait()
try: b.truncate(0)
except BufferError: pass # ignore exported buffer
def read(barrier, b, *ignore):
barrier.wait()
b.read()
def read1(barrier, b, *ignore):
barrier.wait()
b.read1()
def readline(barrier, b, *ignore):
barrier.wait()
b.readline()
def readlines(barrier, b, *ignore):
barrier.wait()
b.readlines()
def readinto(barrier, b, into, *ignore):
barrier.wait()
b.readinto(into)
def close(barrier, b, *ignore):
barrier.wait()
b.close()
def getvalue(barrier, b, *ignore):
barrier.wait()
b.getvalue()
def getbuffer(barrier, b, *ignore):
barrier.wait()
b.getbuffer()
def iter(barrier, b, *ignore):
barrier.wait()
list(b)
def getstate(barrier, b, *ignore):
barrier.wait()
b.__getstate__()
def setstate(barrier, b, st, *ignore):
barrier.wait()
b.__setstate__(st)
def sizeof(barrier, b, *ignore):
barrier.wait()
getsizeof(b)
self.check([write] * 10, BytesIO())
self.check([writelines] * 10, BytesIO())
self.check([write] * 10 + [truncate] * 10, BytesIO())
self.check([truncate] + [read] * 10, BytesIO(b'0\n'*204800))
self.check([truncate] + [read1] * 10, BytesIO(b'0\n'*204800))
self.check([truncate] + [readline] * 10, BytesIO(b'0\n'*20480))
self.check([truncate] + [readlines] * 10, BytesIO(b'0\n'*20480))
self.check([truncate] + [readinto] * 10, BytesIO(b'0\n'*204800), bytearray(b'0\n'*204800))
self.check([close] + [write] * 10, BytesIO())
self.check([truncate] + [getvalue] * 10, BytesIO(b'0\n'*204800))
self.check([truncate] + [getbuffer] * 10, BytesIO(b'0\n'*204800))
self.check([truncate] + [iter] * 10, BytesIO(b'0\n'*20480))
self.check([truncate] + [getstate] * 10, BytesIO(b'0\n'*204800))
self.check([truncate] + [setstate] * 10, BytesIO(b'0\n'*204800), (b'123', 0, None))
self.check([truncate] + [sizeof] * 10, BytesIO(b'0\n'*204800))
# no tests for seek or tell because they don't break anything
class IncrementalNewlineDecoderTest(TestCase):
def make_decoder(self):
utf8_decoder = codecs.getincrementaldecoder('utf-8')()
return io.IncrementalNewlineDecoder(utf8_decoder, translate=True)
def test_concurrent_reset(self):
decoder = self.make_decoder()
def worker():
for _ in range(100):
decoder.reset()
run_concurrently(worker_func=worker, nthreads=2)
def test_concurrent_decode(self):
decoder = self.make_decoder()
def worker():
for _ in range(100):
decoder.decode(b"line\r\n", final=False)
run_concurrently(worker_func=worker, nthreads=2)
def test_concurrent_getstate_setstate(self):
decoder = self.make_decoder()
state = decoder.getstate()
def getstate_worker():
for _ in range(100):
decoder.getstate()
def setstate_worker():
for _ in range(100):
decoder.setstate(state)
run_concurrently([getstate_worker] * 2 + [setstate_worker] * 2)
def test_concurrent_decode_and_reset(self):
decoder = self.make_decoder()
def decode_worker():
for _ in range(100):
decoder.decode(b"line\r\n", final=False)
def reset_worker():
for _ in range(100):
decoder.reset()
run_concurrently([decode_worker] * 2 + [reset_worker] * 2)
|