File: cmtkThreadPoolThreads.h

package info (click to toggle)
cmtk 3.3.1p2%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,524 kB
  • sloc: cpp: 87,098; ansic: 23,347; sh: 3,896; xml: 1,551; perl: 707; makefile: 334
file content (238 lines) | stat: -rw-r--r-- 9,337 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
/*
//
//  Copyright 1997-2009 Torsten Rohlfing
//
//  Copyright 2004-2011 SRI International
//
//  This file is part of the Computational Morphometry Toolkit.
//
//  http://www.nitrc.org/projects/cmtk/
//
//  The Computational Morphometry Toolkit is free software: you can
//  redistribute it and/or modify it under the terms of the GNU General Public
//  License as published by the Free Software Foundation, either version 3 of
//  the License, or (at your option) any later version.
//
//  The Computational Morphometry Toolkit is distributed in the hope that it
//  will be useful, but WITHOUT ANY WARRANTY; without even the implied
//  warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
//  GNU General Public License for more details.
//
//  You should have received a copy of the GNU General Public License along
//  with the Computational Morphometry Toolkit.  If not, see
//  <http://www.gnu.org/licenses/>.
//
//  $Revision: 5436 $
//
//  $LastChangedDate: 2018-12-10 19:01:20 -0800 (Mon, 10 Dec 2018) $
//
//  $LastChangedBy: torstenrohlfing $
//
*/

#ifndef __cmtkThreadPoolThreads_h_included_
#define __cmtkThreadPoolThreads_h_included_

#include <cmtkconfig.h>

#include <System/cmtkCannotBeCopied.h>

#include <System/cmtkThreadSystemTypes.h>
#include <System/cmtkThreadSemaphore.h>
#include <System/cmtkMutexLock.h>
#include <System/cmtkSmartPtr.h>

#include <vector>

namespace
cmtk
{

/** \addtogroup System */
//@{
/** Class that provides a pool of continuously running threads that can be used for reducing overhead in SMP computations.
 *
 * Every instance of this class starts a pool of threads upon initialization, which can later be used to execute arbitrary tasks.
 * The threads are held via a semaphore, which are used to signal the availability of an arbitrary number of tasks. Once the
 * task semaphores have been posted, the calling function waits for the running threads to signal back the same number of
 * completed tasks via a second semaphore.
 *
 * The main advantages of this thread execution framework are:
 *
 * 1. On platforms with inefficient, slow thread creation and joining (Win32), the use and re-use of a persistent set of threads
 * greatly improves run-time efficiency and reduces overhead.
 *
 * 2. A single computation can be broken into more tasks than there are threads running, i.e., more tasks than there are CPUs
 * available. This allows for load balancing, because tasks that complete faster than others will free the executing thread, which
 * is then available to process the next available task without waiting for any other threads.
 *
 * This class provides a global thread pool, which can (and should) be shared by all computations in a process. Creating additional
 * thread pool instances should hardly ever be necessary.
 *
 * To run tasks on the global thread pool, simply create a std::vector that contains a parameter block for each task. The size of the
 * vector also determines the number of tasks to run. For example, the thread parameter could simply be a pointer to the current
 * instance ("this") of a class that acts as the client that requests a parallel computation:
 *
 *\code
 * #include <vector>
 * #include <System/cmtkThreadPool.h>
 *
 * class ComputationClass
 * {
 * public:
 *   typedef ComputationClass Self;
 *
 *   void ComputeUsingSMP()
 *   {
 *     // run approximately four times as many tasks as there are threads (only one task for single thread)
 *     const size_t numberOfTasks = 4 * ThreadPool::GlobalThreadPool.GetNumberOfThreads() - 3;
 *
 *     std::vector<Self*> taskParamaters( numberOfTasks, this );
 *     cmtk::ThreadPool::GlobalThreadPool.Run( ComputeTask, taskParameters );
 *   }
 *
 *   void ComputeTask( void *const arg, const size_t taskIdx, const size_t taskCnt, const size_t threadIdx, const size_t threadCnt )
 *   {
 *     Self* Caller = static_cast<Self*>( arg );
 *     // more things to do for "Caller"
 *     // taskIdx is the index of this task; taskCnt is the total number of tasks. These two determine what part of the total work must be done.
 *     // threadIdx is the index of the "physical" thread out of threadCnt threads that are running in this pool. If temporary memory is allocated
 *     // for this function, then threadIdx can be used to index this temporary storage, thus allowing us to get by with threadCnt many spaces,
 *     // rather than taskCnt many, which is usuallu much larger.
 *   }
 * };
 *\endcode
 */
class ThreadPoolThreads :
  /// Make class uncopyable via inheritance.
  private CannotBeCopied
{
public:
  /// This class.
  typedef ThreadPoolThreads Self;

  /// Smart pointer.
  typedef SmartPointer<Self> SmartPtr;

  /** Task function: this is the interface for the functions called by the pooled threads to do the actual work.
   * The task function receives five parameters: a) a pointer to its parameter black, b) the index of the task,
   * c) the number of tasks, d) the index of the thread within the pool that is calling the task, and e) the
   * number of threads in the pool. Whereas the function should use b) and c) to determine what portion of work 
   * it needs to do, d) and e) must be used to determine, for example, what local memory should be used, if
   * temporary storage has been allocated for each thread. Because the number of tasks typically exceeds the
   * number of threads, this is more efficient than allocating temporary storage for each task.
   */
  typedef void (*TaskFunction)( void *const args /*!< Pointer to parameter block for this task.*/,
				const size_t taskIdx /*!< Index of this task.*/,
				const size_t taskCnt /*!< Number of tasks.*/,
				const size_t threadIdx /*!< Index of the thread that is running this task.*/,
				const size_t threadCnt /*!< Number of threads in this pool.*/ );
  
  /** Constructor: create a pool of nThreads running threads.
   *\param nThreads Number of threads to create for this pool. By default, the number
   * of threads created is the current number of available threads, i.e., typically
   * the number of CPUs minus the number of currently running threads, if any.
   */
  ThreadPoolThreads( const size_t nThreads = 0 );

  /// Destructor: stop all running threads.
  ~ThreadPoolThreads();

  /// Return number of threads in the pool.
  size_t GetNumberOfThreads() const
  {
    return this->m_NumberOfThreads;
  }

  /// Run actual worker functions through running threads.
  template<class TParam> 
  void Run( Self::TaskFunction taskFunction /*!< Pointer to task function.*/,
	    std::vector<TParam>& taskParameters /*!< Vector of task parameter blocks, one per task.*/,
	    const size_t numberOfTasksOverride = 0 /*!< This can be used to run a smaller number of tasks than taskParameters.size(), which is useful to allow re-use of larger, allocated vector.*/ );

  /** Get reference to global thread pool.
   * This is shared by all functions in the process and allows re-use of the same "physical" threads 
   * for all types of computations. The thread pool itself is a local static instance within this
   * function, thus making sure it is initialized properly (see Effective C++, 3rd, Item 4).
   */
  static Self& GetGlobalThreadPool();

  /** Thread function arguments: identify pool and index of thread in it.
   * Cannot be "private" because we need this in a "C" linkage function, which cannot be
   * a "friend" of this class.
   */
  class ThreadPoolThreadsArg
  {
  public:
    /// The thread pool.
    ThreadPoolThreads* m_Pool;

    /// Index of thread in pool.
    size_t m_Index;
  };

  /** This function is run as a thread.
   * Cannot be "private" because we need to call this from a "C" linkage function, which cannot be
   * a "friend" of this class.
   */
  void ThreadFunction( const size_t threadIdx /*!< Index of the actual thread in the pool. */ );
  
private:
  /// Semaphore to signal running threads when tasks are waiting.
  ThreadSemaphore m_TaskWaitingSemaphore;

  /// Semaphore that threads use to signal when tasks they are ready for the next task.
  ThreadSemaphore m_ThreadWaitingSemaphore;

  /// Total number of tasks to execute.
  size_t m_NumberOfTasks;

  /// Index of next available task.
  size_t m_NextTaskIndex;

  /// Lock to ensure exclusive access to the task index counter.
  MutexLock m_NextTaskIndexLock;

  /// The current task function.
  Self::TaskFunction m_TaskFunction;

  /// Task function parameter pointers.
  std::vector<void*> m_TaskParameters;

  /// Thread function parameters.
  std::vector<Self::ThreadPoolThreadsArg> m_ThreadArgs;
  
  /// Number of running threads.
  size_t m_NumberOfThreads;

#ifdef CMTK_USE_SMP
  /// Thread handles.
  std::vector<ThreadIDType> m_ThreadID;
  
#ifdef _MSC_VER
  /// Windows thread handles
  std::vector<HANDLE> m_ThreadHandles;
#endif
#endif
  
  /// Flag whether threads for this pool are running.
  bool m_ThreadsRunning;

  /** Flag whether threads should continue or terminate.
   * When the thread pool is destructed, this is set to "true" to wind down all running
   * threads gracefully.
   */
  bool m_ContinueThreads;

  /// Start threads for this pool.
  void StartThreads();

  /// End threads for this pool.
  void EndThreads();
};

} // namespace cmtk

#include "cmtkThreadPoolThreads.txx"

#endif // #ifndef __cmtkThreadPoolThreads_h_included_