File: FXThreadPool.h

package info (click to toggle)
gogglesmm 1.2.5-6
  • links: PTS
  • area: main
  • in suites: forky, sid
  • size: 16,812 kB
  • sloc: cpp: 231,960; ansic: 893; xml: 222; makefile: 33
file content (219 lines) | stat: -rw-r--r-- 9,457 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
/********************************************************************************
*                                                                               *
*                             T h r e a d   P o o l                             *
*                                                                               *
*********************************************************************************
* Copyright (C) 2006,2022 by Jeroen van der Zijp.   All Rights Reserved.        *
*********************************************************************************
* This library is free software; you can redistribute it and/or modify          *
* it under the terms of the GNU Lesser General Public License as published by   *
* the Free Software Foundation; either version 3 of the License, or             *
* (at your option) any later version.                                           *
*                                                                               *
* This library is distributed in the hope that it will be useful,               *
* but WITHOUT ANY WARRANTY; without even the implied warranty of                *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the                 *
* GNU Lesser General Public License for more details.                           *
*                                                                               *
* You should have received a copy of the GNU Lesser General Public License      *
* along with this program.  If not, see <http://www.gnu.org/licenses/>          *
********************************************************************************/
#ifndef FXTHREADPOOL_H
#define FXTHREADPOOL_H

#ifndef FXRUNNABLE_H
#include "FXRunnable.h"
#endif

namespace FX {


class FXWorker;

/// Task queue
typedef FXLFQueueOf<FXRunnable> FXTaskQueue;


/**
* A Thread Pool manages execution of tasks on a number of worker-threads.
*
* A task executed by the thread pool is queued up until a worker-thread becomes available
* to run it.
* To accomodate fluctuations in workloads and minimize resources, the number of worker-
* threads can be allowed to vary between a minimum and a maximum number.
* Idle worker-threads which receive no assignments within a specified amount of time will
* automatically terminate, thereby reduce system-resources used by the program.
* By default, the minimum number of worker-threads is 1, and the maximum number of worker-
* threads is equal to the number of processors in the system.
* During peak workloads, when the task queue may start to fill up, and no new worker-
* threads can be created, the calling thread to block until there is room in the queue
* for more tasks.
* However, if a non-default value is passed for the blocking-parameter to execute(), the
* calling thread will be blocked for only a finite amount of time (non-zero blocking value)
* or return immediately (zero blocking value).
* Failure to queue up a new task will result in execute() returning a false.
* The tasks which are passed to the thread pool are derived from FXRunnable.  In order
* to perform some useful function, subclasses of FXRunnable should overload the run()
* function.
* Uncaught exceptions thrown by a task are intercepted by the thread pool and rethrown
* after the necessary cleanup, and cause the worker thread to end prematurely.
* When the thread pool is stopped, it will wait until all tasks are finished, and then
* cause all worker-threads to terminate.
* The thread pool becomes associated (through a thread-local variable) with the calling
* thread when start() is called; this association lasts until stop() is called.
* In addition, each worker will similarly be associated with the thread pool.
* Thus both the main thread as well as the worker threads can easily find the thread
* pool through the member-function instance().
*/
class FXAPI FXThreadPool : public FXRunnable {
private:
  FXTaskQueue     queue;        // Task queue
  FXCompletion    tasks;        // Active tasks
  FXCompletion    threads;      // Active threads
  FXSemaphore     freeslots;    // Free slots in queue
  FXSemaphore     usedslots;    // Used slots in queue
  FXuval          stacksize;    // Stack size
  FXTime          expiration;   // Quit if no task within this time
  volatile FXuint maximum;      // Maximum threads
  volatile FXuint minimum;      // Minimum threads
  volatile FXuint workers;      // Working threads
  volatile FXuint running;      // Context is running
private:
  static FXAutoThreadStorageKey reference;
private:
  FXbool startWorker();
  void runWhile(FXCompletion& comp,FXTime timeout);
  virtual FXint run();
private:
  FXThreadPool(const FXThreadPool&);
  FXThreadPool &operator=(const FXThreadPool&);
public:

  /**
  * Construct an empty thread pool, with given task-queue size.
  */
  FXThreadPool(FXuint sz=256);

  /// Return true if running
  FXbool active() const { return running==1; }

  /// Change task queue size, return true if success
  FXbool setSize(FXuint sz);

  /// Return task queue size
  FXuint getSize() const { return queue.getSize(); }

  /// Return number of tasks
  FXuint getRunningTasks() const { return tasks.count(); }

  /// Return number of threads
  FXuint getRunningThreads() const { return threads.count(); }

  /// Change minimum number of worker threads; default is 1
  FXbool setMinimumThreads(FXuint n);

  /// Return minimum number of worker threads
  FXuint getMinimumThreads() const { return minimum; }

  /// Change maximum number of worker threads; default is #processors
  FXbool setMaximumThreads(FXuint n);

  /// Return maximum number of worker threads
  FXuint getMaximumThreads() const { return maximum; }

  /// Change expiration time for excess workers to terminate
  FXbool setExpiration(FXTime ns=forever);

  /// Get expiration time
  FXTime getExpiration() const { return expiration; }

  /// Change stack size (0 for default stack size)
  FXbool setStackSize(FXuval sz);

  /// Get stack size
  FXuval getStackSize() const { return stacksize; }

  /// Return calling thread's thread pool
  static FXThreadPool* instance();

  /// Change calling thread's thread pool
  static void instance(FXThreadPool *pool);

  /**
  * Start the thread pool with an initial number of threads equal to count.
  * Returns the number of threads actually started.
  * An association will be established between the calling thread and the thread pool.
  * This association lasts until stop() is called.  If another threadpool was already started
  * before by the calling thread, no new association will be established.
  */
  FXuint start(FXuint count=0);

  /**
  * Execute a task on the thread pool by entering it into the queue.
  * If a spot becomes available in the task queue within the timeout interval,
  * add the task to the queue and return true.
  * Return false if the task could not be added within the given time interval.
  * Possibly starts additional worker threads if the maximum number of worker
  * threads has not yet been exceeded.
  */
  FXbool execute(FXRunnable* task,FXTime blocking=forever);

  /**
  * Execute task on the thread pool by entering int into the queue.
  * If the task was successfully added, the calling thread will temporarily enter
  * the task-processing loop, and help out the worker-threads until all tasks
  * have finished processing.
  * Return false if the task could not be added within the given time interval.
  * Possibly starts additional worker threads if the maximum number of worker
  * threads has not yet been exceeded.
  */
  FXbool executeAndWait(FXRunnable* task,FXTime blocking=forever);

  /**
  * Execute task on the thread pool by entering int into the queue.
  * If the task was successfully added, the calling thread will temporarily enter
  * the task-processing loop, and help out the worker-threads until the completion
  * becomes signaled.
  * Return false if the task could not be added within the given time interval.
  * Possibly starts additional worker threads if the maximum number of worker
  * threads has not yet been exceeded.
  */
  FXbool executeAndWaitFor(FXRunnable* task,FXCompletion& comp,FXTime blocking=forever);

  /**
  * Wait until task queue becomes empty and all tasks are finished, and process tasks
  * to help the worker threads in the meantime.
  * If the thread pool was not running, return immediately with false; otherwise,
  * return when the queue is empty and all tasks have finished.
  */
  FXbool wait();

  /**
  * Wait until completion becomes signaled, and process tasks to help the worker
  * threads in the meantime.
  * If the thread pool was not running, return immediately with false; otherwise,
  * return when the completion becomes signaled, or when the thread pool is stopped.
  * Immediately return with false if the thread pool wasn't running.
  */
  FXbool waitFor(FXCompletion& comp);

  /**
  * Stop thread pool, and block posting of any new tasks to the queue.
  * Enter the task-processing loop and help the worker-threads until the task queue is
  * empty, and all tasks have finished executing.
  * The association between the calling thread, established when start() was called,
  * will hereby be dissolved, if the calling thread was associated with this thread pool.
  * Return false if the thread pool wasn't running.
  */
  FXbool stop();

  /**
  * Stop the thread pool and then delete it.
  */
  virtual ~FXThreadPool();
  };


}

#endif