1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
#!/usr/bin/env python3
import math
import queue
import functools
import threading
from pathlib import Path
import numpy as np
import tables as tb
class ThreadsafeFileRegistry(tb.file._FileRegistry):
lock = threading.RLock()
@property
def handlers(self):
return self._handlers.copy()
def add(self, handler):
with self.lock:
return super().add(handler)
def remove(self, handler):
with self.lock:
return super().remove(handler)
def close_all(self):
with self.lock:
return super().close_all(self.handlers)
class ThreadsafeFile(tb.file.File):
def __init__(self, *args, **kargs):
with ThreadsafeFileRegistry.lock:
super().__init__(*args, **kargs)
def close(self):
with ThreadsafeFileRegistry.lock:
super().close()
@functools.wraps(tb.open_file)
def synchronized_open_file(*args, **kwargs):
with ThreadsafeFileRegistry.lock:
return tb.file._original_open_file(*args, **kwargs)
# monkey patch the tables package
tb.file._original_open_file = tb.file.open_file
tb.file.open_file = synchronized_open_file
tb.open_file = synchronized_open_file
tb.file._original_File = tb.file.File
tb.file.File = ThreadsafeFile
tb.File = ThreadsafeFile
tb.file._open_files = ThreadsafeFileRegistry()
SIZE = 100
NTHREADS = 5
FILENAME = "simple_threading.h5"
H5PATH = "/array"
def create_test_file(filename):
data = np.random.rand(SIZE, SIZE)
with tb.open_file(filename, "w") as h5file:
h5file.create_array("/", "array", title="Test Array", obj=data)
def chunk_generator(data_size, nchunks):
chunk_size = math.ceil(data_size / nchunks)
for start in range(0, data_size, chunk_size):
yield slice(start, start + chunk_size)
def run(filename, path, inqueue, outqueue):
try:
yslice = inqueue.get()
with tb.open_file(filename, mode="r") as h5file:
h5array = h5file.get_node(path)
data = h5array[yslice, ...]
psum = np.sum(data)
except Exception as e:
outqueue.put(e)
else:
outqueue.put(psum)
def main():
# generate the test data
if not Path(FILENAME).exists():
create_test_file(FILENAME)
threads = []
inqueue = queue.Queue()
outqueue = queue.Queue()
# start all threads
for i in range(NTHREADS):
thread = threading.Thread(
target=run, args=(FILENAME, H5PATH, inqueue, outqueue)
)
thread.start()
threads.append(thread)
# push requests in the input queue
for yslice in chunk_generator(SIZE, len(threads)):
inqueue.put(yslice)
# collect results
try:
mean_ = 0
for _ in range(len(threads)):
out = outqueue.get()
if isinstance(out, Exception):
raise out
else:
mean_ += out
mean_ /= SIZE * SIZE
finally:
for thread in threads:
thread.join()
# print results
print(f"Mean: {mean_}")
if __name__ == "__main__":
main()
|