File: itkMultiThreaderBase.h

package info (click to toggle)
insighttoolkit5 5.4.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 704,588 kB
  • sloc: cpp: 784,579; ansic: 628,724; xml: 44,704; fortran: 34,250; python: 22,934; sh: 4,078; pascal: 2,636; lisp: 2,158; makefile: 461; yacc: 328; asm: 205; perl: 203; lex: 146; tcl: 132; javascript: 98; csh: 81
file content (504 lines) | stat: -rw-r--r-- 18,840 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
/*=========================================================================
 *
 *  Copyright NumFOCUS
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *         https://www.apache.org/licenses/LICENSE-2.0.txt
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 *=========================================================================*/
/*=========================================================================
 *
 *  Portions of this file are subject to the VTK Toolkit Version 3 copyright.
 *
 *  Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
 *
 *  For complete copyright, license and disclaimer of warranty information
 *  please refer to the NOTICE file at the top of the ITK source tree.
 *
 *=========================================================================*/
#ifndef itkMultiThreaderBase_h
#define itkMultiThreaderBase_h

#include "itkObject.h"
#include "itkThreadSupport.h"
#include "itkObjectFactory.h"
#include "itkIntTypes.h"
#include "itkImageRegion.h"
#include "itkImageIORegion.h"
#include "itkSingletonMacro.h"
#include <atomic>
#include <functional>
#include <thread>
#include "itkProgressReporter.h"


namespace itk
{

/** \class MultiThreaderBaseEnums
 *
 * \brief enums for MultiThreaderBase
 *
 * \ingroup ITKCommon
 */
class MultiThreaderBaseEnums
{
public:
  /** \class Threader
   * \ingroup ITKCommon
   * Currently supported types of multi-threader implementations.
   * Last will change with additional implementations.
   */
  enum class Threader : int8_t
  {
    Platform = 0,
    First = Platform,
    Pool,
    TBB,
    Last = TBB,
    Unknown = -1
  };

  /** \class ThreadExitCode
   * \ingroup ITKCommon
   */
  enum class ThreadExitCode : uint8_t
  {
    SUCCESS,
    ITK_EXCEPTION,
    ITK_PROCESS_ABORTED_EXCEPTION,
    STD_EXCEPTION,
    UNKNOWN
  };
};
// Define how to print enumeration
extern ITKCommon_EXPORT std::ostream &
                        operator<<(std::ostream & out, const MultiThreaderBaseEnums::Threader value);
extern ITKCommon_EXPORT std::ostream &
                        operator<<(std::ostream & out, const MultiThreaderBaseEnums::ThreadExitCode value);

/** \class MultiThreaderBase
 * \brief A class for performing multithreaded execution
 *
 * Multithreaders are a class hierarchy that provides support for
 * multithreaded execution by abstracting away platform-specific
 * details. This class can be used to execute a single
 * method on multiple threads or to parallelize an operation over a
 * given image region or array.
 *
 * \ingroup OSSystemObjects
 *
 * \ingroup ITKCommon
 */

struct MultiThreaderBaseGlobals;
class ProcessObject;

class ITKCommon_EXPORT MultiThreaderBase : public Object
{
public:
  ITK_DISALLOW_COPY_AND_MOVE(MultiThreaderBase);

  /** Standard class type aliases. */
  using Self = MultiThreaderBase;
  using Superclass = Object;
  using Pointer = SmartPointer<Self>;
  using ConstPointer = SmartPointer<const Self>;

  /** Method for creation through the object factory. */
  static Pointer
  New();

  /** \see LightObject::GetNameOfClass() */
  itkOverrideGetNameOfClassMacro(MultiThreaderBase);

  /** Get/Set the number of threads to use. It will be clamped to the range
   * [ 1, m_GlobalMaximumNumberOfThreads ], so the caller of this method should
   * check that the requested number of threads was accepted. */
  virtual void
  SetMaximumNumberOfThreads(ThreadIdType numberOfThreads);
  itkGetConstMacro(MaximumNumberOfThreads, ThreadIdType);

  /** Get/Set the number of work units to create. It might be clamped to the range
   * [ 1, SomeMaximumNumber ], so the caller of this method should
   * check that the requested number of work units was accepted. */
  virtual void
  SetNumberOfWorkUnits(ThreadIdType numberOfWorkUnits);
  itkGetConstMacro(NumberOfWorkUnits, ThreadIdType);

  virtual void
  SetUpdateProgress(bool updates);
  itkGetConstMacro(UpdateProgress, bool);

  /** Set/Get the maximum number of threads to use when multithreading.  It
   * will be clamped to the range [ 1, ITK_MAX_THREADS ] because several arrays
   * are already statically allocated using the ITK_MAX_THREADS number.
   * Therefore the caller of this method should check that the requested number
   * of threads was accepted. */
  static void
  SetGlobalMaximumNumberOfThreads(ThreadIdType val);
  static ThreadIdType
  GetGlobalMaximumNumberOfThreads();

  /** Set/Get whether to use the to use the thread pool
   * implementation or the spawning implementation of
   * starting threads.
   *
   * Deprecated: use Get/Set GlobalDefaultThreader. */
  itkLegacyMacro(static void SetGlobalDefaultUseThreadPool(const bool GlobalDefaultUseThreadPool));
  itkLegacyMacro(static bool GetGlobalDefaultUseThreadPool());

  using ThreaderEnum = MultiThreaderBaseEnums::Threader;
#if !defined(ITK_LEGACY_REMOVE)
  using ThreaderType = ThreaderEnum;
  static constexpr ThreaderEnum Platform = ThreaderEnum::Platform;
  static constexpr ThreaderEnum First = ThreaderEnum::First;
  static constexpr ThreaderEnum Pool = ThreaderEnum::Pool;
  static constexpr ThreaderEnum TBB = ThreaderEnum::TBB;
  static constexpr ThreaderEnum Last = ThreaderEnum::Last;
  static constexpr ThreaderEnum Unknown = ThreaderEnum::Unknown;
#endif

  /** Convert a threader name into its enum type. */
  static ThreaderEnum
  ThreaderTypeFromString(std::string threaderString);

  /** Convert a threader enum type into a string for displaying or logging. */
  static std::string
  ThreaderTypeToString(ThreaderEnum threader)
  {
    switch (threader)
    {
      case ThreaderEnum::Platform:
        return "Platform";
      case ThreaderEnum::Pool:
        return "Pool";
      case ThreaderEnum::TBB:
        return "TBB";
      case ThreaderEnum::Unknown:
      default:
        return "Unknown";
    }
  }

  /** Set/Get whether the default multi-threader implementation.
   *
   * The default multi-threader type is picked up from ITK_GLOBAL_DEFAULT_THREADER
   * environment variable. Example ITK_GLOBAL_DEFAULT_THREADER=TBB
   * A deprecated ITK_USE_THREADPOOL environment variable is also examined,
   * but it can only choose Pool or Platform multi-threader.
   * Platform multi-threader should be avoided,
   * as it introduces the biggest overhead.
   *
   * If the SetGlobalDefaultThreaderType API is ever used by the developer,
   * the developer's choice is respected over the environment variables. */
  static void
  SetGlobalDefaultThreader(ThreaderEnum threaderType);
  static ThreaderEnum
  GetGlobalDefaultThreader();

  /** Set/Get the value which is used to initialize the NumberOfThreads in the
   * constructor.  It will be clamped to the range [1, m_GlobalMaximumNumberOfThreads ].
   * Therefore the caller of this method should check that the requested number
   * of threads was accepted. */
  static void
  SetGlobalDefaultNumberOfThreads(ThreadIdType val);
  static ThreadIdType
  GetGlobalDefaultNumberOfThreads();

#if !defined(ITK_LEGACY_REMOVE)
  /** Get/Set the number of threads to use.
   * DEPRECATED! Use WorkUnits and MaximumNumberOfThreads instead. */
  itkLegacyMacro(virtual void SetNumberOfThreads(ThreadIdType numberOfThreads))
  {
    this->SetMaximumNumberOfThreads(numberOfThreads);
    this->SetNumberOfWorkUnits(this->GetMaximumNumberOfThreads()); // Might be clamped
  }
  itkLegacyMacro(virtual ThreadIdType GetNumberOfThreads()) { return this->GetNumberOfWorkUnits(); }

  /** This is the structure that is passed to the thread that is
   * created from the SingleMethodExecute. It is passed in as a void *,
   * and it is up to the method to cast correctly and extract the information.
   * The ThreadID is a number between 0 and NumberOfThreads-1 that
   * indicates the id of this thread. The UserData is the
   * (void *)arg passed into the SetSingleMethod.
   *
   * DEPRECATED! Use WorkUnitInfo instead. */
  // clang-format off
ITK_GCC_PRAGMA_DIAG_PUSH()
ITK_GCC_PRAGMA_DIAG(ignored "-Wattributes")
INTEL_PRAGMA_WARN_PUSH
INTEL_SUPPRESS_warning_1292
  // clang-format on
#  ifdef ITK_LEGACY_SILENT
  struct ThreadInfoStruct
#  else
  struct [[deprecated("Use WorkUnitInfo, ThreadInfoStruct is deprecated since ITK 5.0")]] ThreadInfoStruct
#  endif
    // clang-format off
INTEL_PRAGMA_WARN_POP
  // clang-format on
  {
    ThreadIdType       ThreadID;
    ThreadIdType       NumberOfThreads;
    void *             UserData;
    ThreadFunctionType ThreadFunction;
    enum
    {
      SUCCESS,
      ITK_EXCEPTION,
      ITK_PROCESS_ABORTED_EXCEPTION,
      STD_EXCEPTION,
      UNKNOWN
    } ThreadExitCode;
  };
  // clang-format off
ITK_GCC_PRAGMA_DIAG_POP()
  // clang-format on
#endif // ITK_LEGACY_REMOVE

  /** This is the structure that is passed to the thread that is
   * created from the SingleMethodExecute. It is passed in as a void *,
   * and it is up to the method to cast correctly and extract the information.
   * The WorkUnitID is a number between 0 and NumberOfWorkUnits-1 that
   * indicates the id of this work unit. The UserData is the
   * (void *)arg passed into the SetSingleMethod. */
  struct WorkUnitInfo
  {
    ThreadIdType       WorkUnitID;
    ThreadIdType       NumberOfWorkUnits;
    void *             UserData;
    ThreadFunctionType ThreadFunction;
    using ThreadExitCodeEnum = MultiThreaderBaseEnums::ThreadExitCode;
    ThreadExitCodeEnum ThreadExitCode;
#if !defined(ITK_LEGACY_REMOVE)
    using ThreadExitCodeType = ThreadExitCodeEnum;
    static constexpr ThreadExitCodeEnum SUCCESS = ThreadExitCodeEnum::SUCCESS;
    static constexpr ThreadExitCodeEnum ITK_EXCEPTION = ThreadExitCodeEnum::ITK_EXCEPTION;
    static constexpr ThreadExitCodeEnum ITK_PROCESS_ABORTED_EXCEPTION =
      ThreadExitCodeEnum::ITK_PROCESS_ABORTED_EXCEPTION;
    static constexpr ThreadExitCodeEnum STD_EXCEPTION = ThreadExitCodeEnum::STD_EXCEPTION;
    static constexpr ThreadExitCodeEnum UNKNOWN = ThreadExitCodeEnum::UNKNOWN;
#endif
  };

  /** Execute the SingleMethod (as define by SetSingleMethod) using
   * m_NumberOfWorkUnits threads. As a side effect the m_NumberOfWorkUnits will be
   * checked against the current m_GlobalMaximumNumberOfThreads and clamped if
   * necessary. */
  virtual void
  SingleMethodExecute() = 0;

  /** Set the SingleMethod to f() and the UserData field of the
   * WorkUnitInfo that is passed to it will be data. This method
   * must be of type itkThreadFunctionType and must take a single argument of
   * type void *. */
  virtual void
  SetSingleMethod(ThreadFunctionType, void * data) = 0;

  /** Set the method and the user data, by SetSingleMethod, and executes the method, by SingleMethodExecute. */
  void
  SetSingleMethodAndExecute(ThreadFunctionType func, void * data);

#ifndef ITK_FUTURE_LEGACY_REMOVE
  // `TemplatedThreadingFunctorType` was previously used to declare the `funcP` parameter of `ParallelizeImageRegion`
  // and `ParallelizeImageRegionRestrictDirection` template member functions.
  template <unsigned int VDimension>
  using TemplatedThreadingFunctorType [[deprecated]] = std::function<void(const ImageRegion<VDimension> &)>;
#endif

  using ThreadingFunctorType = std::function<void(const IndexValueType index[], const SizeValueType size[])>;
  using ArrayThreadingFunctorType = std::function<void(SizeValueType)>;

  /** Parallelize an operation over an array. If filter argument is not nullptr,
   * this function will update its progress as each index is completed.
   *
   * This implementation simply delegates parallelization to the old interface
   * SetSingleMethod+SingleMethodExecute. This method is meant to be overloaded! */
  virtual void
  ParallelizeArray(SizeValueType             firstIndex,
                   SizeValueType             lastIndexPlus1,
                   ArrayThreadingFunctorType aFunc,
                   ProcessObject *           filter);

  /** Break up region into smaller chunks, and call the user-specified function or function object `funcP` for each
   * chunk, having the region of the chunk as argument. The type of such a chuck region is `ImageRegion<VDimension>`.
   * Each such `funcP(region)` call must be thread-safe.
   * If filter argument is not nullptr, this function will update its progress
   * as each work unit is completed. Delegates work to non-templated version. */
  template <unsigned int VDimension, typename TFunction>
  ITK_TEMPLATE_EXPORT void
  ParallelizeImageRegion(const ImageRegion<VDimension> & requestedRegion, TFunction funcP, ProcessObject * filter)
  {
    this->ParallelizeImageRegion(
      VDimension,
      requestedRegion.GetIndex().m_InternalArray,
      requestedRegion.GetSize().m_InternalArray,
      [&funcP](const IndexValueType index[], const SizeValueType size[]) {
        ImageRegion<VDimension> region;
        for (unsigned int d = 0; d < VDimension; ++d)
        {
          region.SetIndex(d, index[d]);
          region.SetSize(d, size[d]);
        }
        funcP(region);
      },
      filter);
  }

  /** Similar to ParallelizeImageRegion, but do not split the region along one
   * of the directions. If VDimension is 1, restrictedDirection is ignored
   * and no parallelization occurs. */
  template <unsigned int VDimension, typename TFunction>
  ITK_TEMPLATE_EXPORT void
  ParallelizeImageRegionRestrictDirection(unsigned int                    restrictedDirection,
                                          const ImageRegion<VDimension> & requestedRegion,
                                          TFunction                       funcP,
                                          ProcessObject *                 filter)
  {
    if constexpr (VDimension <= 1) // Cannot split, no parallelization
    {

      ProgressReporter progress(filter, 0, requestedRegion.GetNumberOfPixels());
      funcP(requestedRegion);
    }
    else // Can split, parallelize!
    {
      constexpr unsigned int SplitDimension = VDimension - 1;

      Index<SplitDimension> splitIndex{};
      Size<SplitDimension>  splitSize{};
      for (unsigned int splitDimension = 0, dimension = 0; dimension < VDimension; ++dimension)
      {
        if (dimension != restrictedDirection)
        {
          splitIndex[splitDimension] = requestedRegion.GetIndex(dimension);
          splitSize[splitDimension] = requestedRegion.GetSize(dimension);
          ++splitDimension;
        }
      }

      this->ParallelizeImageRegion(
        SplitDimension,
        splitIndex.m_InternalArray,
        splitSize.m_InternalArray,
        [restrictedDirection, &requestedRegion, &funcP](const IndexValueType index[], const SizeValueType size[]) {
          ImageRegion<VDimension> restrictedRequestedRegion;
          restrictedRequestedRegion.SetIndex(restrictedDirection, requestedRegion.GetIndex(restrictedDirection));
          restrictedRequestedRegion.SetSize(restrictedDirection, requestedRegion.GetSize(restrictedDirection));
          for (unsigned int splitDimension = 0, dimension = 0; dimension < VDimension; ++dimension)
          {
            if (dimension != restrictedDirection)
            {
              restrictedRequestedRegion.SetIndex(dimension, index[splitDimension]);
              restrictedRequestedRegion.SetSize(dimension, size[splitDimension]);
              ++splitDimension;
            }
          }
          funcP(restrictedRequestedRegion);
        },
        filter);
    }
  }

  /** Break up region into smaller chunks, and call the function with chunks as parameters.
   *  This overload does the actual work and should be implemented by derived classes. */
  virtual void
  ParallelizeImageRegion(unsigned int         dimension,
                         const IndexValueType index[],
                         const SizeValueType  size[],
                         ThreadingFunctorType funcP,
                         ProcessObject *      filter);

protected:
  MultiThreaderBase();
  ~MultiThreaderBase() override;
  void
  PrintSelf(std::ostream & os, Indent indent) const override;

  struct ArrayCallback
  {
    ArrayThreadingFunctorType functor;
    const SizeValueType       firstIndex;
    const SizeValueType       lastIndexPlus1;
    ProcessObject *           filter;
  };

  static ITK_THREAD_RETURN_FUNCTION_CALL_CONVENTION
  ParallelizeArrayHelper(void * arg);

  struct RegionAndCallback
  {
    ThreadingFunctorType   functor;
    unsigned int           dimension;
    const IndexValueType * index;
    const SizeValueType *  size;
    ProcessObject *        filter;
  };

  static ITK_THREAD_RETURN_FUNCTION_CALL_CONVENTION
  ParallelizeImageRegionHelper(void * arg);

  /** The number of work units to create. */
  ThreadIdType m_NumberOfWorkUnits{};

  /** The number of threads to use.
   *  The m_MaximumNumberOfThreads must always be less than or equal to
   *  the m_GlobalMaximumNumberOfThreads before it is used during the execution
   *  of a threaded method. Its value is initialized to
   *  m_GlobalDefaultNumberOfThreads at construction time. Its value is clamped
   *  to the current m_GlobalMaximumNumberOfThreads in the
   *  SingleMethodExecute() method.
   */
  ThreadIdType m_MaximumNumberOfThreads{};

  /** Static function used as a "proxy callback" by multi-threaders.  The
   * threading library will call this routine for each thread, which
   * will delegate the control to the prescribed SingleMethod. This
   * routine acts as an intermediary between the multi-threaders and the
   * user supplied callback (SingleMethod) in order to catch any
   * exceptions thrown by the threads. */
  static ITK_THREAD_RETURN_FUNCTION_CALL_CONVENTION
  SingleMethodProxy(void * arg);

  /** The method to invoke. */
  ThreadFunctionType m_SingleMethod{ nullptr };

  /** The data to be passed as argument. */
  void * m_SingleData{ nullptr };

private:
  static void
  SetGlobalDefaultThreaderPrivate(ThreaderEnum threaderType);
  static ThreaderEnum
  GetGlobalDefaultThreaderPrivate();

  /** Only used to synchronize the global variable across static libraries.*/
  itkGetGlobalDeclarationMacro(MultiThreaderBaseGlobals, PimplGlobals);

  std::atomic<bool> m_UpdateProgress{ true };

  static MultiThreaderBaseGlobals * m_PimplGlobals;
  /** Friends of Multithreader.
   * ProcessObject is a friend so that it can call PrintSelf() on its
   * Multithreader. */
  friend class ProcessObject;

  /** Platform specific number of threads */
  static ThreadIdType
  GetGlobalDefaultNumberOfThreadsByPlatform();
};


} // namespace itk

#endif