File: cpl_worker_thread_pool.h

package info (click to toggle)
gdal 3.10.3%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 87,476 kB
  • sloc: cpp: 1,151,435; ansic: 215,362; python: 26,401; java: 5,972; xml: 4,596; sh: 3,263; cs: 2,503; yacc: 1,090; makefile: 289
file content (139 lines) | stat: -rw-r--r-- 3,627 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/**********************************************************************
 * $Id$
 *
 * Project:  CPL - Common Portability Library
 * Purpose:  CPL worker thread pool
 * Author:   Even Rouault, <even dot rouault at spatialys dot com>
 *
 **********************************************************************
 * Copyright (c) 2015, Even Rouault, <even dot rouault at spatialys dot com>
 *
 * SPDX-License-Identifier: MIT
 ****************************************************************************/

#ifndef CPL_WORKER_THREAD_POOL_H_INCLUDED_
#define CPL_WORKER_THREAD_POOL_H_INCLUDED_

#include "cpl_multiproc.h"
#include "cpl_list.h"

#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <vector>

/**
 * \file cpl_worker_thread_pool.h
 *
 * Class to manage a pool of worker threads.
 * @since GDAL 2.1
 */

#ifndef DOXYGEN_SKIP
class CPLWorkerThreadPool;

struct CPLWorkerThread
{
    CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThread)
    CPLWorkerThread() = default;

    CPLThreadFunc pfnInitFunc = nullptr;
    void *pInitData = nullptr;
    CPLWorkerThreadPool *poTP = nullptr;
    CPLJoinableThread *hThread = nullptr;
    bool bMarkedAsWaiting = false;

    std::mutex m_mutex{};
    std::condition_variable m_cv{};
};

typedef enum
{
    CPLWTS_OK,
    CPLWTS_STOP,
    CPLWTS_ERROR
} CPLWorkerThreadState;
#endif  // ndef DOXYGEN_SKIP

class CPLJobQueue;
/// Unique pointer to a job queue.
using CPLJobQueuePtr = std::unique_ptr<CPLJobQueue>;

/** Pool of worker threads */
class CPL_DLL CPLWorkerThreadPool
{
    CPL_DISALLOW_COPY_ASSIGN(CPLWorkerThreadPool)

    std::vector<std::unique_ptr<CPLWorkerThread>> aWT{};
    mutable std::mutex m_mutex{};
    std::condition_variable m_cv{};
    volatile CPLWorkerThreadState eState = CPLWTS_OK;
    std::queue<std::function<void()>> jobQueue;
    int nPendingJobs = 0;

    CPLList *psWaitingWorkerThreadsList = nullptr;
    int nWaitingWorkerThreads = 0;

    int m_nMaxThreads = 0;

    static void WorkerThreadFunction(void *user_data);

    void DeclareJobFinished();
    std::function<void()> GetNextJob(CPLWorkerThread *psWorkerThread);

  public:
    CPLWorkerThreadPool();
    explicit CPLWorkerThreadPool(int nThreads);
    ~CPLWorkerThreadPool();

    bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData);
    bool Setup(int nThreads, CPLThreadFunc pfnInitFunc, void **pasInitData,
               bool bWaitallStarted);

    CPLJobQueuePtr CreateJobQueue();

    bool SubmitJob(std::function<void()> task);
    bool SubmitJob(CPLThreadFunc pfnFunc, void *pData);
    bool SubmitJobs(CPLThreadFunc pfnFunc, const std::vector<void *> &apData);
    void WaitCompletion(int nMaxRemainingJobs = 0);
    void WaitEvent();

    /** Return the number of threads setup */
    int GetThreadCount() const;
};

/** Job queue */
class CPL_DLL CPLJobQueue
{
    CPL_DISALLOW_COPY_ASSIGN(CPLJobQueue)
    CPLWorkerThreadPool *m_poPool = nullptr;
    std::mutex m_mutex{};
    std::condition_variable m_cv{};
    int m_nPendingJobs = 0;

    void DeclareJobFinished();

    //! @cond Doxygen_Suppress
  protected:
    friend class CPLWorkerThreadPool;
    explicit CPLJobQueue(CPLWorkerThreadPool *poPool);
    //! @endcond

  public:
    ~CPLJobQueue();

    /** Return the owning worker thread pool */
    CPLWorkerThreadPool *GetPool()
    {
        return m_poPool;
    }

    bool SubmitJob(CPLThreadFunc pfnFunc, void *pData);
    bool SubmitJob(std::function<void()> task);
    void WaitCompletion(int nMaxRemainingJobs = 0);
    bool WaitEvent();
};

#endif  // CPL_WORKER_THREAD_POOL_H_INCLUDED_