1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
|
/* Copyright (C) 2006-2016 J.F.Dockes
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA
*/
#ifndef _WORKQUEUE_H_INCLUDED_
#define _WORKQUEUE_H_INCLUDED_
#include <chrono>
#include <thread>
#include <string>
#include <queue>
#include <list>
#include <mutex>
#include <condition_variable>
#ifdef MDU_INCLUDE_LOG
#include MDU_INCLUDE_LOG
#else
#include "log.h"
#endif
using namespace std::chrono_literals;
/**
* A WorkQueue manages the synchronisation around a queue of work items,
* where a number of client threads queue tasks and a number of worker
* threads take and execute them. The goal is to introduce some level
* of parallelism between the successive steps of a previously single
* threaded pipeline. For example data extraction / data preparation / index
* update, but this could have other uses.
*
* There is no individual task status return. In case of fatal error,
* the client or worker sets an end condition on the queue. A second
* queue could conceivably be used for returning individual task
* status.
*
* The strange thread functions argument and return values
* comes from compatibility with an earlier pthread-based
* implementation.
*/
template <class T> class WorkQueue {
public:
/** Create a WorkQueue
* @param name for message printing
* @param hi number of tasks on queue before clients blocks. Default 0
* meaning no limit. hi == -1 means that the queue is disabled.
* @param lo minimum count of tasks before worker starts. Default 1.
*/
WorkQueue(const std::string& name, size_t hi = 0, size_t lo = 1)
: m_name(name), m_high(hi), m_low(lo)
{
}
~WorkQueue() {
if (!m_worker_threads.empty()) {
setTerminateAndWait();
}
}
WorkQueue(const WorkQueue&) = delete;
WorkQueue& operator=(const WorkQueue&) = delete;
/** Task deleter
* If put() is called with the flush option, and the tasks allocate memory,
* you need to set this function, which will be called on each task popped
* from the queue. Tasks which go through normally must be freed by the
* worker function.
*/
void setTaskFreeFunc(void (*func)(T&)) {
m_taskfreefunc = func;
}
/// Forbid inputting new tasks. This is mostly useful for abnormal terminations as some data will
/// probably be lost, depending on how the upstream handles the put() error.
void closeShop() {
m_openforbusiness = false;
}
/** Start the worker threads.
*
* @param nworkers number of threads copies to start.
* @param start_routine thread function. It should loop
* taking (QueueWorker::take()) and executing tasks.
* @param arg initial parameter to thread function.
* @return true if ok.
*/
bool start(int nworkers, void *(workproc)(void *), void *arg) {
std::unique_lock<std::mutex> lock(m_mutex);
for (int i = 0; i < nworkers; i++) {
Worker w;
w.thr = std::thread(workproc, arg);
m_worker_threads.push_back(std::move(w));
}
return true;
}
/** Add item to work queue, called from client.
*
* Sleeps if there are already too many.
*/
bool put(T t, bool flushprevious = false) {
std::unique_lock<std::mutex> lock(m_mutex);
if (!ok() || !m_openforbusiness) {
LOGERR("WorkQueue::put: " << m_name << ": ok: " << ok() << " openforbusiness " <<
m_openforbusiness << "\n");
return false;
}
LOGDEB2("WorkQueue::put: " << m_name << "\n");
while (ok() && m_high > 0 && m_queue.size() >= m_high) {
m_clientsleeps++;
// Keep the order: we test ok() AFTER the sleep...
m_clients_waiting++;
m_ccond.wait(lock);
if (!ok()) {
m_clients_waiting--;
return false;
}
m_clients_waiting--;
}
if (flushprevious) {
while (!m_queue.empty()) {
if (m_taskfreefunc) {
T& d = m_queue.front();
m_taskfreefunc(d);
}
m_queue.pop();
}
}
m_queue.push(t);
if (m_workers_waiting > 0) {
// Just wake one worker, there is only one new task.
m_wcond.notify_one();
} else {
m_nowake++;
}
return true;
}
/** Wait until the queue is inactive. Called from client.
*
* Waits until the task queue is empty and the workers are all
* back sleeping (or exited). Used by the client to wait for all current work
* to be completed, when it needs to perform work that couldn't be
* done in parallel with the worker's tasks, or before shutting
* down. Work can be resumed after calling this. Note that the
* only thread which can call it safely is the client just above
* (which can control the task flow), else there could be
* tasks in the intermediate queues.
* To rephrase: there is no warranty on return that the queue is actually
* idle EXCEPT if the caller knows that no jobs are still being created.
* It would be possible to transform this into a safe call if some kind
* of suspend condition was set on the queue by waitIdle(), to be reset by
* some kind of "resume" call. Not currently the case.
*/
bool waitIdle() {
std::unique_lock<std::mutex> lock(m_mutex);
// We're not done while:
// - the queue is not empty and we have some workers left
// - OR some workers are working (not exited or back waiting for a task).
while (((m_queue.size() > 0 && m_workers_exited < m_worker_threads.size()) ||
(m_workers_waiting + m_workers_exited) < m_worker_threads.size())) {
LOGDEB0("waitIdle: " << m_name << " qsz " << m_queue.size() <<
" wwaiting " << m_workers_waiting << " wexit " << m_workers_exited << " nthr " <<
m_worker_threads.size() << "\n");
m_clients_waiting++;
m_ccond.wait(lock);
m_clients_waiting--;
}
return ok();
}
/** Tell the workers to exit, and wait for them.
*
* Does not bother about tasks possibly remaining on the queue, so
* should be called after waitIdle() for an orderly shutdown.
*/
void *setTerminateAndWait() {
std::unique_lock<std::mutex> lock(m_mutex);
LOGDEB("setTerminateAndWait:" << m_name << "\n");
if (m_worker_threads.empty()) {
// Already called ?
return (void*)0;
}
// Wait for all worker threads to have called workerExit()
m_ok = false;
while (m_workers_exited < m_worker_threads.size()) {
m_wcond.notify_all();
m_clients_waiting++;
m_ccond.wait(lock);
m_clients_waiting--;
}
LOGDEB(m_name << ": tasks " << m_tottasks << " nowakes " << m_nowake << " wsleeps " <<
m_workersleeps << " csleeps " << m_clientsleeps << "\n");
// Perform the thread joins and compute overall status
// Workers return (void*)1 if ok
void *statusall = (void*)1;
while (!m_worker_threads.empty()) {
void *status = (void*) 1;
m_worker_threads.front().thr.join();
if (status == (void *)0) {
statusall = status;
}
m_worker_threads.pop_front();
}
// Reset to start state.
m_workers_exited = m_clients_waiting = m_workers_waiting =
m_tottasks = m_nowake = m_workersleeps = m_clientsleeps = 0;
m_ok = true;
LOGDEB("setTerminateAndWait:" << m_name << " done\n");
return statusall;
}
/** Take task from queue. Called from worker.
*
* Sleeps if there are not enough. Signal if we go to sleep on empty
* queue: client may be waiting for our going idle.
*/
bool take(T* tp, size_t *szp = nullptr, std::chrono::duration<double> waitdur = {-1ms}) {
std::unique_lock<std::mutex> lock(m_mutex);
if (!ok()) {
LOGDEB("WorkQueue::take:" << m_name << ": not ok\n");
return false;
}
while (ok() && m_queue.size() < m_low) {
m_workersleeps++;
m_workers_waiting++;
if (m_queue.empty()) {
m_ccond.notify_all();
}
if (waitdur < 0ms) {
m_wcond.wait(lock);
} else if (m_wcond.wait_for(lock, waitdur) == std::cv_status::timeout) {
m_workers_waiting--;
*tp = nullptr;
return true;
}
if (!ok()) {
// !ok is a normal condition when shutting down
m_workers_waiting--;
return false;
}
m_workers_waiting--;
}
m_tottasks++;
*tp = m_queue.front();
if (szp) {
*szp = m_queue.size();
}
m_queue.pop();
if (m_clients_waiting > 0) {
// No reason to wake up more than one client thread
m_ccond.notify_one();
} else {
m_nowake++;
}
return true;
}
bool waitminsz(size_t sz) {
std::unique_lock<std::mutex> lock(m_mutex);
if (!ok()) {
return false;
}
while (ok() && m_queue.size() < sz) {
m_workersleeps++;
m_workers_waiting++;
if (m_queue.empty()) {
m_ccond.notify_all();
}
m_wcond.wait(lock);
if (!ok()) {
m_workers_waiting--;
return false;
}
m_workers_waiting--;
}
return true;
}
/** Advertise exit and abort queue. Called from worker
*
* This would happen after an unrecoverable error, or when
* the queue is terminated by the client. Workers never exit normally,
* except when the queue is shut down (at which point m_ok is set to
* false by the shutdown code anyway). The thread must return/exit
* immediately after calling this.
*/
void workerExit() {
LOGDEB("workerExit:" << m_name << "\n");
std::unique_lock<std::mutex> lock(m_mutex);
m_workers_exited++;
m_ok = false;
m_ccond.notify_all();
}
size_t qsize() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
private:
bool ok() {
bool isok = m_ok && m_workers_exited == 0 && !m_worker_threads.empty();
if (!isok) {
LOGDEB("WorkQueue:ok:" << m_name << ": not ok m_ok " << m_ok <<
" m_workers_exited " << m_workers_exited <<
" m_worker_threads size " << m_worker_threads.size() <<
"\n");
}
return isok;
}
struct Worker {
std::thread thr;
};
void (*m_taskfreefunc)(T&){nullptr};
// Configuration
std::string m_name;
size_t m_high;
size_t m_low;
// Worker threads having called exit. Used to decide when we're done
unsigned int m_workers_exited{0};
// Status
bool m_ok{true};
// Accepting new tasks
bool m_openforbusiness{true};
// Our threads.
std::list<Worker> m_worker_threads;
// Jobs input queue
std::queue<T> m_queue;
// Synchronization
std::condition_variable m_ccond;
std::condition_variable m_wcond;
std::mutex m_mutex;
// Client/Worker threads currently waiting for a job
unsigned int m_clients_waiting{0};
unsigned int m_workers_waiting{0};
// Statistics
unsigned int m_tottasks{0};
unsigned int m_nowake{0};
unsigned int m_workersleeps{0};
unsigned int m_clientsleeps{0};
};
#endif /* _WORKQUEUE_H_INCLUDED_ */
|