File: thread.h

package info (click to toggle)
mrtrix3 3.0.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 13,712 kB
  • sloc: cpp: 129,776; python: 9,494; sh: 593; makefile: 234; xml: 47
file content (385 lines) | stat: -rw-r--r-- 13,946 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
/* Copyright (c) 2008-2022 the MRtrix3 contributors.
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 *
 * Covered Software is provided under this License on an "as is"
 * basis, without warranty of any kind, either expressed, implied, or
 * statutory, including, without limitation, warranties that the
 * Covered Software is free of defects, merchantable, fit for a
 * particular purpose or non-infringing.
 * See the Mozilla Public License v. 2.0 for more details.
 *
 * For more details, see http://www.mrtrix.org/.
 */

#ifndef __mrtrix_thread_h__
#define __mrtrix_thread_h__

#include <thread>
#include <future>
#include <mutex>

#include "debug.h"
#include "mrtrix.h"
#include "exception.h"

/** \defgroup thread_classes Multi-threading
 * \brief functions to provide support for multi-threading
 *
 * These functions and associated classes provide a simple interface for
 * multi-threading in MRtrix applications. Most of the low-level funtionality
 * relies on the C++11 `std::thread` API. MRtrix3 builds on this to add three
 * convenience methods:
 *
 * - [Thread::run()](@ref Thread::run()) to launch one or more worker threads;
 *
 * - [ThreadedLoop()](@ref image_thread_looping) to run an operation over all voxels
 *   in one or more images;
 *
 * - [Thread::run_queue()](@ref Thread::run_queue()) to run a pipeline,
 *   with one or more threads feeding data through to one or more other threads
 *   (potentially with further stages in the pipeline).
 *
 * These APIs provide simple and convenient ways of multi-threading, and should
 * be sufficient for the vast majority of applications.
 *
 * Please refer to the \ref multithreading page for an overview of
 * multi-threading in MRtrix.
 *
 * \sa Thread::run()
 * \sa threaded_loop
 * \sa Thread::run_queue()
 */

namespace MR
{
  namespace Thread
  {

    class __Backend { NOMEMALIGN
      public:
        __Backend();
        ~__Backend();

        static void register_thread () {
          std::lock_guard<std::mutex> lock (mutex);
          if (!backend)
            backend = new __Backend;
          ++backend->refcount;
        }
        static void unregister_thread () {
          assert (backend);
          std::lock_guard<std::mutex> lock (mutex);
          if (!(--backend->refcount)) {
            delete backend;
            backend = nullptr;
          }
        }

        static bool valid() { return backend; }

        static void thread_print_func (const std::string& msg);
        static void thread_report_to_user_func (const std::string& msg, int type);

        static void (*previous_print_func) (const std::string& msg);
        static void (*previous_report_to_user_func) (const std::string& msg, int type);

      protected:
        size_t refcount;

        static __Backend* backend;
        static std::mutex mutex;
    };


    namespace {

      class __thread_base { NOMEMALIGN
        public:
          __thread_base (const std::string& name = "unnamed") : name (name) { __Backend::register_thread(); }
          __thread_base (const __thread_base&) = delete;
          __thread_base (__thread_base&&) = default;
          ~__thread_base () { __Backend::unregister_thread(); }


        protected:
          const std::string name;
      };


      class __single_thread : public __thread_base { NOMEMALIGN
        public:
          template <class Functor>
            __single_thread (Functor&& functor, const std::string& name = "unnamed") :
            __thread_base (name) {
              DEBUG ("launching thread \"" + name + "\"...");
              using F = typename std::remove_reference<Functor>::type;
              thread = std::async (std::launch::async, &F::execute, &functor);
            }
          __single_thread (const __single_thread&) = delete;
          __single_thread (__single_thread&&) = default;

          bool finished () const
          {
            return thread.wait_for(std::chrono::microseconds(0)) == std::future_status::ready;
          }

          void wait () noexcept (false) {
            DEBUG ("waiting for completion of thread \"" + name + "\"...");
            thread.get();
            DEBUG ("thread \"" + name + "\" completed OK");
          }

          ~__single_thread () {
            if (thread.valid()) {
              try { wait(); }
              catch (Exception& E) { E.display(); }
            }
          }

        protected:
          std::future<void> thread;
      };


      template <class Functor>
        class __multi_thread : public __thread_base { NOMEMALIGN
          public:
            __multi_thread (Functor& functor, size_t nthreads, const std::string& name = "unnamed") :
              __thread_base (name), functors ( (nthreads>0 ? nthreads-1 : 0), functor) {
                DEBUG ("launching " + str (nthreads) + " threads \"" + name + "\"...");
                using F = typename std::remove_reference<Functor>::type;
                threads.reserve (nthreads);
                for (auto& f : functors)
                  threads.push_back (std::async (std::launch::async, &F::execute, &f));
                threads.push_back (std::async (std::launch::async, &F::execute, &functor));
              }

            __multi_thread (const __multi_thread&) = delete;
            __multi_thread (__multi_thread&&) = default;

            void wait () noexcept (false) {
              DEBUG ("waiting for completion of threads \"" + name + "\"...");
              bool exception_thrown = false;
              for (auto& t : threads) {
                if (!t.valid())
                  continue;
                try { t.get(); }
                catch (Exception& E) {
                  exception_thrown = true;
                  E.display();
                }
              }
              if (exception_thrown)
                throw Exception ("exception thrown from one or more threads \"" + name + "\"");
              DEBUG ("threads \"" + name + "\" completed OK");
            }

            bool finished () const {
              for (auto& t : threads)
                if (t.wait_for (std::chrono::microseconds(0)) != std::future_status::ready)
                  return false;
              return true;
            }

            bool any_valid () const {
              for (auto& t : threads)
                if (t.valid())
                  return true;
              return false;
            }

            ~__multi_thread () {
              if (any_valid()) {
                try { wait(); }
                catch (Exception& E) { E.display(); }
              }
            }
          protected:
            vector<std::future<void>> threads;
            vector<typename std::remove_reference<Functor>::type> functors;

        };


      template <class Functor>
        class __Multi { NOMEMALIGN
          public:
            __Multi (Functor& object, size_t number) : functor (object), num (number) { }
            __Multi (__Multi&& m) = default;
            template <class X> bool operator() (const X&) { assert (0); return false; }
            template <class X> bool operator() (X&) { assert (0); return false; }
            template <class X, class Y> bool operator() (const X&, Y&) { assert (0); return false; }
            typename std::remove_reference<Functor>::type& functor;
            size_t num;
        };

      template <class Functor>
        class __run { NOMEMALIGN
          public:
            using type = __single_thread;
            type operator() (Functor& functor, const std::string& name) {
              return { functor, name };
            }
        };

      template <class Functor>
        class __run<__Multi<Functor>> { NOMEMALIGN
          public:
            using type = __multi_thread<Functor>;
            type operator() (__Multi<Functor>& functor, const std::string& name) {
              return { functor.functor, functor.num, name };
            }
        };

    }



    /** \addtogroup thread_classes
     * @{ */

    /** \defgroup thread_basics Basic multi-threading primitives
     * \brief basic functions and classes to allow multi-threading
     *
     * These functions and classes mostly provide a thin wrapper around the
     * C++11 threads API. While they can be used as-is to develop
     * multi-threaded applications, in practice the \ref image_thread_looping
     * and \ref thread_queue APIs provide much more convenient and powerful
     * ways of developing robust and efficient applications.
     *
     * @{ */

    /*! the number of cores available for multi-threading, as specified in the
     * variable NumberOfThreads in the MRtrix configuration file, or set using
     * the -nthreads command-line option */
    size_t number_of_threads ();

    /*! provides information regarding whether the number of threads has been
     * initialised, set explicitly, or determined implicitly. This may affect
     * how particular algorithms choose to launch threads depending on the
     * presence of a user request. */
    enum class nthreads_t { UNINITIALISED, EXPLICIT, IMPLICIT };
    nthreads_t type_nthreads ();

    /*! the number of threads to execute for a particular task
     * if some higher-level process has already executed multiple threads,
     * do not want the lower-level process querying this function to also
     * generate a large number of threads; instead the lower-level function
     * should run explicitly single-threaded. */
    size_t threads_to_execute ();



    //! used to request multiple threads of the corresponding functor
    /*! This function is used in combination with Thread::run or
     * Thread::run_queue to request that the functor \a object be run in
     * parallel using \a number threads of execution (defaults to
     * Thread::threads_to_execute()).
     * \sa Thread::run()
     * \sa Thread::run_queue() */
    template <class Functor>
      inline __Multi<typename std::remove_reference<Functor>::type>
      multi (Functor&& functor, size_t nthreads = threads_to_execute())
      {
        return { functor, nthreads };
      }



    //! Execute the functor's execute method in a separate thread
    /*! Launch a thread by running the execute method of the object \a functor,
     * which should have the following prototype:
     * \code
     * class MyFunc {
     *   public:
     *     void execute ();
     * };
     * \endcode
     *
     * The thread is launched by the constructor, and the destructor will wait
     * for the thread to finish.  The lifetime of a thread launched via this
     * method is therefore restricted to the scope of the returned object. For
     * example:
     * \code
     * class MyFunctor {
     *   public:
     *     void execute () {
     *       ...
     *       // do something useful
     *       ...
     *     }
     * };
     *
     * void some_function () {
     *   MyFunctor func; // parameters can be passed to func in its constructor
     *
     *   // thread is launched as soon as my_thread is instantiated:
     *   auto my_thread = Thread::run (func, "my function");
     *   ...
     *   // do something else while my_thread is running
     *   ...
     *   // wait for my_thread to complete - this is necessary to catch
     *   // exceptions - see below
     *   my_thread.wait();
     * }
     * \endcode
     *
     * It is also possible to launch an array of threads in parallel, by
     * wrapping the functor into a call to Thread::multi(), as follows:
     * \code
     * ...
     * auto my_threads = Thread::run (Thread::multi (func), "my function");
     * ...
     * my_thread.wait();
     * ...
     * \endcode
     *
     * \par Exception handling
     *
     * Proper handling of exceptions in a multi-threaded context is
     * non-trivial, and in general you should take every precaution to prevent
     * threads from throwing exceptions. This means you should perform all
     * error checking within a single-threaded context, before starting
     * processing-intensive threads, so as to minimise the chances of anything
     * going wrong at that stage.
     *
     * In this implementation, the wait() function can be used to wait until
     * all threads have completed, at which point any exceptions thrown will be
     * displayed, and a futher exception re-thrown to allow the main
     * application to catch the error (this could be the same exception that
     * was originally thrown if a single thread was run). This means the
     * application will continue processing if any of the remaining threads
     * remain active, and it may be a while before the application itself is
     * allowed to handle the error appropriately. If this behaviour is not
     * appropriate, and you expect exceptions to be thrown occasionally, you
     * should take steps to handle these yourself (e.g. by setting / checking
     * some flag within your threads, etc.).
     *
     * \note while the wait() function will also be invoked in the destructor,
     * any exceptions thrown will be caught and \e not re-thrown (throwing in
     * the destructor is considered bad practice). This is to prevent undefined
     * behaviour (i.e. crashes) when multiple thread objects are launched
     * within the same scope, each of which might throw. In these cases, it is
     * best to explicitly call wait() for each of the objects returned by
     * Thread::run(), rather than relying on the destructor alone (note
     * Thread::Queue ThreadedLoop already do this).
     *
     * \sa Thread::multi()
     */
    template <class Functor>
      inline typename __run<Functor>::type run (Functor&& functor, const std::string& name = "unnamed")
      {
        return __run<typename std::remove_reference<Functor>::type>() (functor, name);
      }

    /** @} */
    /** @} */
  }
}

#endif