1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
// SPDX-FileCopyrightText: Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
// SPDX-License-Identifier: BSD-3-Clause
/**
* @class vtkThreadedTaskQueue
* @brief simple threaded task queue
*
* vtkThreadedTaskQueue provides a simple task queue that can use threads to
* execute individual tasks. It is intended for use applications such as data
* compression, encoding etc. where the task may be completed concurrently
* without blocking the main thread.
*
* vtkThreadedTaskQueue's API is intended to called from the same main thread.
* The constructor defines the work (or task) to be performed. `Push` allows the
* caller to enqueue a task with specified input arguments. The call will return
* immediately without blocking. The task is enqueued and will be executed
* concurrently when resources become available. `Pop` will block until the
* result is available. To avoid waiting for results to be available, use
* `TryPop`.
*
* The constructor allows mechanism to customize the queue. `strict_ordering`
* implies that results should be popped in the same order that tasks were
* pushed without dropping any task. If the caller is only concerned with
* obtaining the latest available result where intermediate results that take
* longer to compute may be dropped, then `strict_ordering` can be set to `false`.
*
* `max_concurrent_tasks` controls how many threads are used to process tasks in
* the queue. Default is same as
* `vtkMultiThreader::GetGlobalDefaultNumberOfThreads()`.
*
* `buffer_size` indicates how many tasks may be queued for processing. Default
* is infinite size. If a positive number is provided, then pushing additional
* tasks will result in discarding of older tasks that haven't begun processing
* from the queue. Note, this does not impact tasks that may already be in
* progress. Also, if `strict_ordering` is true, this is ignored; the
* buffer_size will be set to unlimited.
*
*/
#ifndef vtkThreadedTaskQueue_h
#define vtkThreadedTaskQueue_h
#include "vtkObject.h"
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#if !defined(__WRAP__)
namespace vtkThreadedTaskQueueInternals
{
VTK_ABI_NAMESPACE_BEGIN
template <typename R>
class TaskQueue;
template <typename R>
class ResultQueue;
VTK_ABI_NAMESPACE_END
}
VTK_ABI_NAMESPACE_BEGIN
template <typename R, typename... Args>
class vtkThreadedTaskQueue
{
public:
vtkThreadedTaskQueue(std::function<R(Args...)> worker, bool strict_ordering = true,
int buffer_size = -1, int max_concurrent_tasks = -1);
~vtkThreadedTaskQueue();
/**
* Push arguments for the work
*/
void Push(Args&&... args);
/**
* Pop the last result. Returns true on success. May fail if called on an
* empty queue. This will wait for result to be available.
*/
bool Pop(R& result);
/**
* Attempt to pop without waiting. If not results are available, returns
* false.
*/
bool TryPop(R& result);
/**
* Returns false if there's some result that may be popped right now or in the
* future.
*/
bool IsEmpty() const;
/**
* Blocks till the queue becomes empty.
*/
void Flush();
private:
vtkThreadedTaskQueue(const vtkThreadedTaskQueue&) = delete;
void operator=(const vtkThreadedTaskQueue&) = delete;
std::function<R(Args...)> Worker;
std::unique_ptr<vtkThreadedTaskQueueInternals::TaskQueue<R>> Tasks;
std::unique_ptr<vtkThreadedTaskQueueInternals::ResultQueue<R>> Results;
int NumberOfThreads;
std::unique_ptr<std::thread[]> Threads;
};
template <typename... Args>
class vtkThreadedTaskQueue<void, Args...>
{
public:
vtkThreadedTaskQueue(std::function<void(Args...)> worker, bool strict_ordering = true,
int buffer_size = -1, int max_concurrent_tasks = -1);
~vtkThreadedTaskQueue();
/**
* Push arguments for the work
*/
void Push(Args&&... args);
/**
* Returns false if there's some result that may be popped right now or in the
* future.
*/
bool IsEmpty() const;
/**
* Blocks till the queue becomes empty.
*/
void Flush();
private:
vtkThreadedTaskQueue(const vtkThreadedTaskQueue&) = delete;
void operator=(const vtkThreadedTaskQueue&) = delete;
std::function<void(Args...)> Worker;
std::unique_ptr<vtkThreadedTaskQueueInternals::TaskQueue<void>> Tasks;
std::condition_variable ResultsCV;
std::mutex NextResultIdMutex;
std::atomic<std::uint64_t> NextResultId;
int NumberOfThreads;
std::unique_ptr<std::thread[]> Threads;
};
VTK_ABI_NAMESPACE_END
#include "vtkThreadedTaskQueue.txx"
#endif // !defined(__WRAP__)
#endif
// VTK-HeaderTest-Exclude: vtkThreadedTaskQueue.h
|