1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
/**********************************************************************
* $Id$
*
* Project: CPL - Common Portability Library
* Purpose: CPL worker thread pool
* Author: Even Rouault, <even dot rouault at spatialys dot com>
*
**********************************************************************
* Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
*
* SPDX-License-Identifier: MIT
****************************************************************************/
#ifndef CPL_WORKER_THREAD_POOL_H_INCLUDED_
#define CPL_WORKER_THREAD_POOL_H_INCLUDED_
#include "cpl_multiproc.h"
#include "cpl_list.h"
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <vector>
/**
* \file cpl_worker_thread_pool.h
*
* Class to manage a pool of worker threads.
* @since GDAL 2.1
*/
#ifndef DOXYGEN_SKIP
class CPLWorkerThreadPool;
struct CPLWorkerThread
{
CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThread)
CPLWorkerThread() = default;
CPLThreadFunc pfnInitFunc = nullptr;
void *pInitData = nullptr;
CPLWorkerThreadPool *poTP = nullptr;
CPLJoinableThread *hThread = nullptr;
bool bMarkedAsWaiting = false;
std::mutex m_mutex{};
std::condition_variable m_cv{};
};
typedef enum
{
CPLWTS_OK,
CPLWTS_STOP,
CPLWTS_ERROR
} CPLWorkerThreadState;
#endif // ndef DOXYGEN_SKIP
class CPLJobQueue;
/// Unique pointer to a job queue.
using CPLJobQueuePtr = std::unique_ptr<CPLJobQueue>;
/** Pool of worker threads */
class CPL_DLL CPLWorkerThreadPool
{
CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThreadPool)
std::vector<std::unique_ptr<CPLWorkerThread>> aWT{};
mutable std::mutex m_mutex{};
std::condition_variable m_cv{};
volatile CPLWorkerThreadState eState = CPLWTS_OK;
std::queue<std::function<void()>> jobQueue;
int nPendingJobs = 0;
CPLList *psWaitingWorkerThreadsList = nullptr;
int nWaitingWorkerThreads = 0;
int m_nMaxThreads = 0;
static void WorkerThreadFunction(void *user_data);
void DeclareJobFinished();
std::function<void()> GetNextJob(CPLWorkerThread *psWorkerThread);
public:
CPLWorkerThreadPool();
explicit CPLWorkerThreadPool(int nThreads);
~CPLWorkerThreadPool();
bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData);
bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData,
bool bWaitallStarted);
CPLJobQueuePtr CreateJobQueue();
bool SubmitJob(std::function<void()> task);
bool SubmitJob(CPLThreadFunc pfnFunc, void *pData);
bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector<void *> &apData);
void WaitCompletion(int nMaxRemainingJobs = 0);
void WaitEvent();
/** Return the number of threads setup */
int GetThreadCount() const;
};
/** Job queue */
class CPL_DLL CPLJobQueue
{
CPL_DISALLOW_COPY_ASSIGN(CPLJobQueue)
CPLWorkerThreadPool *m_poPool = nullptr;
std::mutex m_mutex{};
std::condition_variable m_cv{};
int m_nPendingJobs = 0;
void DeclareJobFinished();
//! @cond Doxygen_Suppress
protected:
friend class CPLWorkerThreadPool;
explicit CPLJobQueue(CPLWorkerThreadPool *poPool);
//! @endcond
public:
~CPLJobQueue();
/** Return the owning worker thread pool */
CPLWorkerThreadPool *GetPool()
{
return m_poPool;
}
bool SubmitJob(CPLThreadFunc pfnFunc, void *pData);
bool SubmitJob(std::function<void()> task);
void WaitCompletion(int nMaxRemainingJobs = 0);
bool WaitEvent();
};
#endif // CPL_WORKER_THREAD_POOL_H_INCLUDED_
|