1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
|
""" Tests more special use-cases.
"""
import gc
import sys
import queue
import threading
from testutils import ensure_test_files, test_file1
import pytest
import imageio_ffmpeg
IS_PYPY = "__pypy__" in sys.builtin_module_names
def setup_module():
ensure_test_files()
def make_iterator(q, n):
for i in range(n):
gen = imageio_ffmpeg.read_frames(test_file1)
gen.__next__() # meta data
q.put(gen.__next__()) # first frame
def test_threading():
# See issue #20
if IS_PYPY:
pytest.xfail("These threads hang on pypy for some reason.")
num_threads = 16
num_frames = 5
q = queue.Queue()
threads = []
for i in range(num_threads):
t = threading.Thread(target=make_iterator, args=(q, num_frames))
t.daemon = True
t.start()
threads.append(t)
for i in range(num_threads * num_frames):
print(i, end=" ")
gc.collect() # this seems to help invoke the segfault earlier
q.get(timeout=20)
if __name__ == "__main__":
setup_module()
test_threading()
|