1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
|
=========
Threading
=========
.. py:currentmodule:: tables
Background
==========
Several bug reports have been filed in the past by the users regarding
problems related to the impossibility to use PyTables in multi-thread
programs.
The problem was mainly related to an internal registry that forced the
sharing of HDF5 file handles across multiple threads.
In PyTables 3.1.0 the code for file handles management has been completely
redesigned (see the *Backward incompatible changes* section in
:doc:`../release-notes/RELEASE_NOTES_v3.1.x`) to be more simple and
transparent and to allow the use of PyTables in multi-thread programs.
Citing the :doc:`../release-notes/RELEASE_NOTES_v3.1.x`::
It is important to stress that the new implementation still has an
internal registry (implementation detail) and it is still
**not thread safe**.
Just now a smart enough developer should be able to use PyTables in a
muti-thread program without too much headaches.
A common schema for concurrency
===============================
Although it is probably not the most efficient or elegant solution to solve
a certain class of problems, many users seems to like the possibility to
load a portion of data and process it inside a *thread function* using
multiple threads to process the entire dataset.
Each thread is responsible of:
* opening the (same) HDF5 file for reading,
* load data from it and
* close the HDF5 file itself
Each file handle is of exclusive use of the thread that opened it and
file handles are never shared across threads.
In order to do it in a safe way with PyTables some care should be used
during the phase of opening and closing HDF5 files in order ensure the
correct behaviour of the internal machinery used to manage HDF5 file handles.
Very simple solution
====================
A very simple solution for this kind of scenario is to use a
:class:`threading.Lock` around part of the code that are considered critical
e.g. the :func:`open_file` function and the :meth:`File.close` method::
import threading
lock = threading.Lock()
def synchronized_open_file(*args, **kwargs):
with lock:
return tb.open_file(*args, **kwargs)
def synchronized_close_file(self, *args, **kwargs):
with lock:
return self.close(*args, **kwargs)
The :func:`synchronized_open_file` and :func:`synchronized_close_file` can
be used in the *thread function* to open and close the HDF5 file::
import numpy as np
import tables as tb
def run(filename, path, inqueue, outqueue):
try:
yslice = inqueue.get()
h5file = synchronized_open_file(filename, mode='r')
h5array = h5file.get_node(path)
data = h5array[yslice, ...]
psum = np.sum(data)
except Exception as e:
outqueue.put(e)
else:
outqueue.put(psum)
finally:
synchronized_close_file(h5file)
Finally the main function of the program:
* instantiates the input and output :class:`queue.Queue`,
* starts all threads,
* sends the processing requests on the input :class:`queue.Queue`
* collects results reading from the output :class:`queue.Queue`
* performs finalization actions (:meth:`threading.Thread.join`)
.. code-block:: python
import os
import queue
import threading
import numpy as np
import tables as tb
SIZE = 100
NTHREADS = 5
FILENAME = 'simple_threading.h5'
H5PATH = '/array'
def create_test_file(filename):
data = np.random.rand(SIZE, SIZE)
with tb.open_file(filename, 'w') as h5file:
h5file.create_array('/', 'array', title="Test Array", obj=data)
def chunk_generator(data_size, nchunks):
chunk_size = int(np.ceil(data_size / nchunks))
for start in range(0, data_size, chunk_size):
yield slice(start, start + chunk_size)
def main():
# generate the test data
if not os.path.exists(FILENAME):
create_test_file(FILENAME)
threads = []
inqueue = queue.Queue()
outqueue = queue.Queue()
# start all threads
for i in range(NTHREADS):
thread = threading.Thread(
target=run, args=(FILENAME, H5PATH, inqueue, outqueue))
thread.start()
threads.append(thread)
# push requests in the input queue
for yslice in chunk_generator(SIZE, len(threads)):
inqueue.put(yslice)
# collect results
try:
mean_ = 0.
for i in range(len(threads)):
out = outqueue.get()
if isinstance(out, Exception):
raise out
else:
mean_ += out
mean_ /= SIZE * SIZE
finally:
for thread in threads:
thread.join()
# print results
print('Mean: {}'.format(mean_))
if __name__ == '__main__':
main()
The program in the example computes the mean value of a potentially huge
dataset splinting the computation across :data:`NTHREADS` (5 in this case)
threads.
The complete and working code of this example (Python 3 is required) can be
found in the :file:`examples` directory:
:download:`simple_threading.py <../../../examples/simple_threading.py>`.
The approach presented in this section is very simple and readable but has
the **drawback** that the user code have to be modified to replace
:func:`open_file` and :meth:`File.close` calls with their safe version
(:func:`synchronized_open_file` and :func:`synchronized_close_file`).
Also, the solution showed in the example does not cover the entire PyTables
API (e.g. although not recommended HDF5 files can be opened using the
:class:`File` constructor) and makes it impossible to use *pythonic*
constructs like the *with* statement::
with tb.open_file(filename) as h5file:
do_something(h5file)
Monkey-patching PyTables
========================
An alternative implementation with respect to the `Very simple solution`_
presented in the previous section consists in monkey-patching the PyTables
package to replace some of its components with a more thread-safe version of
themselves::
import threading
import tables as tb
import tables.file as _tables_file
class ThreadsafeFileRegistry(_tables_file._FileRegistry):
lock = threading.RLock()
@property
def handlers(self):
return self._handlers.copy()
def add(self, handler):
with self.lock:
return super().add(handler)
def remove(self, handler):
with self.lock:
return super().remove(handler)
def close_all(self):
with self.lock:
return super().close_all(handler)
class ThreadsafeFile(_tables_file.File):
def __init__(self, *args, **kargs):
with ThreadsafeFileRegistry.lock:
super().__init__(*args, **kargs)
def close(self):
with ThreadsafeFileRegistry.lock:
super().close()
@functools.wraps(tb.open_file)
def synchronized_open_file(*args, **kwargs):
with ThreadsafeFileRegistry.lock:
return _tables_file._original_open_file(*args, **kwargs)
# monkey patch the tables package
_tables_file._original_open_file = _tables_file.open_file
_tables_file.open_file = synchronized_open_file
tb.open_file = synchronized_open_file
_tables_file._original_File = _tables_file.File
_tables_file.File = ThreadsafeFile
tb.File = ThreadsafeFile
_tables_file._open_files = ThreadsafeFileRegistry()
At this point PyTables can be used transparently in example program presented
in the previous section.
In particular the standard PyTables API (including *with* statements) can be
used in the *thread function*::
def run(filename, path, inqueue, outqueue):
try:
yslice = inqueue.get()
with tb.open_file(filename, mode='r') as h5file:
h5array = h5file.get_node(path)
data = h5array[yslice, ...]
psum = np.sum(data)
except Exception as e:
outqueue.put(e)
else:
outqueue.put(psum)
The complete code of this version of the example can be found in the
:file:`examples` folder:
:download:`simple_threading.py <../../../examples/threading_monkeypatch.py>`.
Python 3 is required.
|