File: threading.rst

package info (click to toggle)
pytables 3.3.0-5
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 14,972 kB
  • ctags: 16,919
  • sloc: python: 59,339; ansic: 46,596; cpp: 1,463; sh: 476; makefile: 428
file content (274 lines) | stat: -rw-r--r-- 8,519 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
=========
Threading
=========

.. py:currentmodule:: tables


Background
==========

Several bug reports have been filed in the past by the users regarding
problems related to the impossibility to use PyTables in multi-thread
programs.

The problem was mainly related to an internal registry that forced the
sharing of HDF5 file handles across multiple threads.

In PyTables 3.1.0 the code for file handles management has been completely
redesigned (see the *Backward incompatible changes* section in 
:doc:`../release-notes/RELEASE_NOTES_v3.1.x`) to be more simple and
transparent and to allow the use of PyTables in multi-thread programs.

Citing the :doc:`../release-notes/RELEASE_NOTES_v3.1.x`::

    It is important to stress that the new implementation still has an
    internal registry (implementation detail) and it is still
    **not thread safe**.
    Just now a smart enough developer should be able to use PyTables in a
    muti-thread program without too much headaches.


A common schema for concurrency
===============================

Although it is probably not the most efficient or elegant solution to solve
a certain class of problems, many users seems to like the possibility to
load a portion of data and process it inside a *thread function* using
multiple threads to process the entire dataset.

Each thread is responsible of:

* opening the (same) HDF5 file for reading,
* load data from it and
* close the HDF5 file itself

Each file handle is of exclusive use of the thread that opened it and
file handles are never shared across threads.

In order to do it in a safe way with PyTables some care should be used
during the phase of opening and closing HDF5 files in order ensure the
correct behaviour of the internal machinery used to manage HDF5 file handles.


Very simple solution
====================

A very simple solution for this kind of scenario is to use a
:class:`threading.Lock` around part of the code that are considered critical
e.g. the :func:`open_file` function and the :meth:`File.close` method::

    import threading
    
    lock = threading.Lock()

    def synchronized_open_file(*args, **kwargs):
        with lock:
            return tb.open_file(*args, **kwargs)

    def synchronized_close_file(self, *args, **kwargs):
        with lock:
            return self.close(*args, **kwargs)


The :func:`synchronized_open_file` and :func:`synchronized_close_file` can
be used in the *thread function* to open and close the HDF5 file::

    import numpy as np
    import tables as tb

    def run(filename, path, inqueue, outqueue):
        try:
            yslice = inqueue.get()
            h5file = synchronized_open_file(filename, mode='r')
            h5array = h5file.get_node(path)
            data = h5array[yslice, ...]
            psum = np.sum(data)
        except Exception as e:
            outqueue.put(e)
        else:
            outqueue.put(psum)
        finally:
            synchronized_close_file(h5file)


Finally the main function of the program:

* instantiates the input and output :class:`queue.Queue`,
* starts all threads, 
* sends the processing requests on the input :class:`queue.Queue`
* collects results reading from the output :class:`queue.Queue`
* performs finalization actions (:meth:`threading.Thread.join`)

.. code-block:: python

    import os
    import queue
    import threading

    import numpy as np
    import tables as tb

    SIZE = 100
    NTHREADS = 5
    FILENAME = 'simple_threading.h5'
    H5PATH = '/array'

    def create_test_file(filename):
        data = np.random.rand(SIZE, SIZE)

        with tb.open_file(filename, 'w') as h5file:
            h5file.create_array('/', 'array', title="Test Array", obj=data)

    def chunk_generator(data_size, nchunks):
        chunk_size = int(np.ceil(data_size / nchunks))
        for start in range(0, data_size, chunk_size):
            yield slice(start, start + chunk_size)

    def main():
        # generate the test data
        if not os.path.exists(FILENAME):
            create_test_file(FILENAME)

        threads = []
        inqueue = queue.Queue()
        outqueue = queue.Queue()

        # start all threads
        for i in range(NTHREADS):
            thread = threading.Thread(
                target=run, args=(FILENAME, H5PATH, inqueue, outqueue))
            thread.start()
            threads.append(thread)

        # push requests in the input queue
        for yslice in chunk_generator(SIZE, len(threads)):
            inqueue.put(yslice)

        # collect results
        try:
            mean_ = 0.

            for i in range(len(threads)):
                out = outqueue.get()
                if isinstance(out, Exception):
                    raise out
                else:
                    mean_ += out

            mean_ /= SIZE * SIZE

        finally:
            for thread in threads:
                thread.join()

        # print results
        print('Mean: {}'.format(mean_))

    if __name__ == '__main__':
        main()

The program in the example computes the mean value of a potentially huge
dataset splinting the computation across :data:`NTHREADS` (5 in this case)
threads.

The complete and working code of this example (Python 3 is required) can be
found in the :file:`examples` directory:
:download:`simple_threading.py <../../../examples/simple_threading.py>`.

The approach presented in this section is very simple and readable but has
the **drawback** that the user code have to be modified to replace
:func:`open_file` and :meth:`File.close` calls with their safe version
(:func:`synchronized_open_file` and :func:`synchronized_close_file`).

Also, the solution showed in the example does not cover the entire PyTables
API (e.g. although not recommended HDF5 files can be opened using the
:class:`File` constructor) and makes it impossible to use *pythonic*
constructs like the *with* statement::

    with tb.open_file(filename) as h5file:
        do_something(h5file)


Monkey-patching PyTables
========================

An alternative implementation with respect to the `Very simple solution`_
presented in the previous section consists in monkey-patching the PyTables
package to replace some of its components with a more thread-safe version of
themselves::

    import threading

    import tables as tb
    import tables.file as _tables_file

    class ThreadsafeFileRegistry(_tables_file._FileRegistry):
        lock = threading.RLock()

        @property
        def handlers(self):
            return self._handlers.copy()

        def add(self, handler):
            with self.lock:
                return super().add(handler)

        def remove(self, handler):
            with self.lock:
                return super().remove(handler)

        def close_all(self):
            with self.lock:
                return super().close_all(handler)

    class ThreadsafeFile(_tables_file.File):
        def __init__(self, *args, **kargs):
            with ThreadsafeFileRegistry.lock:
                super().__init__(*args, **kargs)

        def close(self):
            with ThreadsafeFileRegistry.lock:
                super().close()

    @functools.wraps(tb.open_file)
    def synchronized_open_file(*args, **kwargs):
        with ThreadsafeFileRegistry.lock:
            return _tables_file._original_open_file(*args, **kwargs)

    # monkey patch the tables package
    _tables_file._original_open_file = _tables_file.open_file
    _tables_file.open_file = synchronized_open_file
    tb.open_file = synchronized_open_file

    _tables_file._original_File = _tables_file.File
    _tables_file.File = ThreadsafeFile
    tb.File = ThreadsafeFile

    _tables_file._open_files = ThreadsafeFileRegistry()


At this point PyTables can be used transparently in example program presented
in the previous section.
In particular the standard PyTables API (including *with* statements) can be
used in the *thread function*::

    def run(filename, path, inqueue, outqueue):
        try:
            yslice = inqueue.get()
            with tb.open_file(filename, mode='r') as h5file:
                h5array = h5file.get_node(path)
                data = h5array[yslice, ...]
            psum = np.sum(data)
        except Exception as e:
            outqueue.put(e)
        else:
            outqueue.put(psum)


The complete code of this version of the example can be found in the
:file:`examples` folder:
:download:`simple_threading.py <../../../examples/threading_monkeypatch.py>`.
Python 3 is required.