1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
|
/* Copyright (c) 2008-2022 the MRtrix3 contributors.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Covered Software is provided under this License on an "as is"
* basis, without warranty of any kind, either expressed, implied, or
* statutory, including, without limitation, warranties that the
* Covered Software is free of defects, merchantable, fit for a
* particular purpose or non-infringing.
* See the Mozilla Public License v. 2.0 for more details.
*
* For more details, see http://www.mrtrix.org/.
*/
#ifndef __mrtrix_thread_h__
#define __mrtrix_thread_h__
#include <thread>
#include <future>
#include <mutex>
#include "debug.h"
#include "mrtrix.h"
#include "exception.h"
/** \defgroup thread_classes Multi-threading
* \brief functions to provide support for multi-threading
*
* These functions and associated classes provide a simple interface for
* multi-threading in MRtrix applications. Most of the low-level funtionality
* relies on the C++11 `std::thread` API. MRtrix3 builds on this to add three
* convenience methods:
*
* - [Thread::run()](@ref Thread::run()) to launch one or more worker threads;
*
* - [ThreadedLoop()](@ref image_thread_looping) to run an operation over all voxels
* in one or more images;
*
* - [Thread::run_queue()](@ref Thread::run_queue()) to run a pipeline,
* with one or more threads feeding data through to one or more other threads
* (potentially with further stages in the pipeline).
*
* These APIs provide simple and convenient ways of multi-threading, and should
* be sufficient for the vast majority of applications.
*
* Please refer to the \ref multithreading page for an overview of
* multi-threading in MRtrix.
*
* \sa Thread::run()
* \sa threaded_loop
* \sa Thread::run_queue()
*/
namespace MR
{
namespace Thread
{
class __Backend { NOMEMALIGN
public:
__Backend();
~__Backend();
static void register_thread () {
std::lock_guard<std::mutex> lock (mutex);
if (!backend)
backend = new __Backend;
++backend->refcount;
}
static void unregister_thread () {
assert (backend);
std::lock_guard<std::mutex> lock (mutex);
if (!(--backend->refcount)) {
delete backend;
backend = nullptr;
}
}
static bool valid() { return backend; }
static void thread_print_func (const std::string& msg);
static void thread_report_to_user_func (const std::string& msg, int type);
static void (*previous_print_func) (const std::string& msg);
static void (*previous_report_to_user_func) (const std::string& msg, int type);
protected:
size_t refcount;
static __Backend* backend;
static std::mutex mutex;
};
namespace {
class __thread_base { NOMEMALIGN
public:
__thread_base (const std::string& name = "unnamed") : name (name) { __Backend::register_thread(); }
__thread_base (const __thread_base&) = delete;
__thread_base (__thread_base&&) = default;
~__thread_base () { __Backend::unregister_thread(); }
protected:
const std::string name;
};
class __single_thread : public __thread_base { NOMEMALIGN
public:
template <class Functor>
__single_thread (Functor&& functor, const std::string& name = "unnamed") :
__thread_base (name) {
DEBUG ("launching thread \"" + name + "\"...");
using F = typename std::remove_reference<Functor>::type;
thread = std::async (std::launch::async, &F::execute, &functor);
}
__single_thread (const __single_thread&) = delete;
__single_thread (__single_thread&&) = default;
bool finished () const
{
return thread.wait_for(std::chrono::microseconds(0)) == std::future_status::ready;
}
void wait () noexcept (false) {
DEBUG ("waiting for completion of thread \"" + name + "\"...");
thread.get();
DEBUG ("thread \"" + name + "\" completed OK");
}
~__single_thread () {
if (thread.valid()) {
try { wait(); }
catch (Exception& E) { E.display(); }
}
}
protected:
std::future<void> thread;
};
template <class Functor>
class __multi_thread : public __thread_base { NOMEMALIGN
public:
__multi_thread (Functor& functor, size_t nthreads, const std::string& name = "unnamed") :
__thread_base (name), functors ( (nthreads>0 ? nthreads-1 : 0), functor) {
DEBUG ("launching " + str (nthreads) + " threads \"" + name + "\"...");
using F = typename std::remove_reference<Functor>::type;
threads.reserve (nthreads);
for (auto& f : functors)
threads.push_back (std::async (std::launch::async, &F::execute, &f));
threads.push_back (std::async (std::launch::async, &F::execute, &functor));
}
__multi_thread (const __multi_thread&) = delete;
__multi_thread (__multi_thread&&) = default;
void wait () noexcept (false) {
DEBUG ("waiting for completion of threads \"" + name + "\"...");
bool exception_thrown = false;
for (auto& t : threads) {
if (!t.valid())
continue;
try { t.get(); }
catch (Exception& E) {
exception_thrown = true;
E.display();
}
}
if (exception_thrown)
throw Exception ("exception thrown from one or more threads \"" + name + "\"");
DEBUG ("threads \"" + name + "\" completed OK");
}
bool finished () const {
for (auto& t : threads)
if (t.wait_for (std::chrono::microseconds(0)) != std::future_status::ready)
return false;
return true;
}
bool any_valid () const {
for (auto& t : threads)
if (t.valid())
return true;
return false;
}
~__multi_thread () {
if (any_valid()) {
try { wait(); }
catch (Exception& E) { E.display(); }
}
}
protected:
vector<std::future<void>> threads;
vector<typename std::remove_reference<Functor>::type> functors;
};
template <class Functor>
class __Multi { NOMEMALIGN
public:
__Multi (Functor& object, size_t number) : functor (object), num (number) { }
__Multi (__Multi&& m) = default;
template <class X> bool operator() (const X&) { assert (0); return false; }
template <class X> bool operator() (X&) { assert (0); return false; }
template <class X, class Y> bool operator() (const X&, Y&) { assert (0); return false; }
typename std::remove_reference<Functor>::type& functor;
size_t num;
};
template <class Functor>
class __run { NOMEMALIGN
public:
using type = __single_thread;
type operator() (Functor& functor, const std::string& name) {
return { functor, name };
}
};
template <class Functor>
class __run<__Multi<Functor>> { NOMEMALIGN
public:
using type = __multi_thread<Functor>;
type operator() (__Multi<Functor>& functor, const std::string& name) {
return { functor.functor, functor.num, name };
}
};
}
/** \addtogroup thread_classes
* @{ */
/** \defgroup thread_basics Basic multi-threading primitives
* \brief basic functions and classes to allow multi-threading
*
* These functions and classes mostly provide a thin wrapper around the
* C++11 threads API. While they can be used as-is to develop
* multi-threaded applications, in practice the \ref image_thread_looping
* and \ref thread_queue APIs provide much more convenient and powerful
* ways of developing robust and efficient applications.
*
* @{ */
/*! the number of cores available for multi-threading, as specified in the
* variable NumberOfThreads in the MRtrix configuration file, or set using
* the -nthreads command-line option */
size_t number_of_threads ();
/*! provides information regarding whether the number of threads has been
* initialised, set explicitly, or determined implicitly. This may affect
* how particular algorithms choose to launch threads depending on the
* presence of a user request. */
enum class nthreads_t { UNINITIALISED, EXPLICIT, IMPLICIT };
nthreads_t type_nthreads ();
/*! the number of threads to execute for a particular task
* if some higher-level process has already executed multiple threads,
* do not want the lower-level process querying this function to also
* generate a large number of threads; instead the lower-level function
* should run explicitly single-threaded. */
size_t threads_to_execute ();
//! used to request multiple threads of the corresponding functor
/*! This function is used in combination with Thread::run or
* Thread::run_queue to request that the functor \a object be run in
* parallel using \a number threads of execution (defaults to
* Thread::threads_to_execute()).
* \sa Thread::run()
* \sa Thread::run_queue() */
template <class Functor>
inline __Multi<typename std::remove_reference<Functor>::type>
multi (Functor&& functor, size_t nthreads = threads_to_execute())
{
return { functor, nthreads };
}
//! Execute the functor's execute method in a separate thread
/*! Launch a thread by running the execute method of the object \a functor,
* which should have the following prototype:
* \code
* class MyFunc {
* public:
* void execute ();
* };
* \endcode
*
* The thread is launched by the constructor, and the destructor will wait
* for the thread to finish. The lifetime of a thread launched via this
* method is therefore restricted to the scope of the returned object. For
* example:
* \code
* class MyFunctor {
* public:
* void execute () {
* ...
* // do something useful
* ...
* }
* };
*
* void some_function () {
* MyFunctor func; // parameters can be passed to func in its constructor
*
* // thread is launched as soon as my_thread is instantiated:
* auto my_thread = Thread::run (func, "my function");
* ...
* // do something else while my_thread is running
* ...
* // wait for my_thread to complete - this is necessary to catch
* // exceptions - see below
* my_thread.wait();
* }
* \endcode
*
* It is also possible to launch an array of threads in parallel, by
* wrapping the functor into a call to Thread::multi(), as follows:
* \code
* ...
* auto my_threads = Thread::run (Thread::multi (func), "my function");
* ...
* my_thread.wait();
* ...
* \endcode
*
* \par Exception handling
*
* Proper handling of exceptions in a multi-threaded context is
* non-trivial, and in general you should take every precaution to prevent
* threads from throwing exceptions. This means you should perform all
* error checking within a single-threaded context, before starting
* processing-intensive threads, so as to minimise the chances of anything
* going wrong at that stage.
*
* In this implementation, the wait() function can be used to wait until
* all threads have completed, at which point any exceptions thrown will be
* displayed, and a futher exception re-thrown to allow the main
* application to catch the error (this could be the same exception that
* was originally thrown if a single thread was run). This means the
* application will continue processing if any of the remaining threads
* remain active, and it may be a while before the application itself is
* allowed to handle the error appropriately. If this behaviour is not
* appropriate, and you expect exceptions to be thrown occasionally, you
* should take steps to handle these yourself (e.g. by setting / checking
* some flag within your threads, etc.).
*
* \note while the wait() function will also be invoked in the destructor,
* any exceptions thrown will be caught and \e not re-thrown (throwing in
* the destructor is considered bad practice). This is to prevent undefined
* behaviour (i.e. crashes) when multiple thread objects are launched
* within the same scope, each of which might throw. In these cases, it is
* best to explicitly call wait() for each of the objects returned by
* Thread::run(), rather than relying on the destructor alone (note
* Thread::Queue ThreadedLoop already do this).
*
* \sa Thread::multi()
*/
template <class Functor>
inline typename __run<Functor>::type run (Functor&& functor, const std::string& name = "unnamed")
{
return __run<typename std::remove_reference<Functor>::type>() (functor, name);
}
/** @} */
/** @} */
}
}
#endif
|