File: vtkThreadedTaskQueue.h

package info (click to toggle)
vtk9 9.3.0%2Bdfsg1-4
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 267,116 kB
  • sloc: cpp: 2,195,914; ansic: 285,452; python: 104,858; sh: 4,061; yacc: 4,035; java: 3,977; xml: 2,771; perl: 2,189; lex: 1,762; objc: 153; makefile: 150; javascript: 90; tcl: 59
file content (160 lines) | stat: -rw-r--r-- 4,706 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// SPDX-FileCopyrightText: Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
// SPDX-License-Identifier: BSD-3-Clause
/**
 * @class vtkThreadedTaskQueue
 * @brief simple threaded task queue
 *
 * vtkThreadedTaskQueue provides a simple task queue that can use threads to
 * execute individual tasks. It is intended for use applications such as data
 * compression, encoding etc. where the task may be completed concurrently
 * without blocking the main thread.
 *
 * vtkThreadedTaskQueue's API is intended to called from the same main thread.
 * The constructor defines the work (or task) to be performed. `Push` allows the
 * caller to enqueue a task with specified input arguments. The call will return
 * immediately without blocking. The task is enqueued and will be executed
 * concurrently when resources become available.  `Pop` will block until the
 * result is available. To avoid waiting for results to be available, use
 * `TryPop`.
 *
 * The constructor allows mechanism to customize the queue. `strict_ordering`
 * implies that results should be popped in the same order that tasks were
 * pushed without dropping any task. If the caller is only concerned with
 * obtaining the latest available result where intermediate results that take
 * longer to compute may be dropped, then `strict_ordering` can be set to `false`.
 *
 * `max_concurrent_tasks` controls how many threads are used to process tasks in
 * the queue. Default is same as
 * `vtkMultiThreader::GetGlobalDefaultNumberOfThreads()`.
 *
 * `buffer_size` indicates how many tasks may be queued for processing. Default
 * is infinite size. If a positive number is provided, then pushing additional
 * tasks will result in discarding of older tasks that haven't begun processing
 * from the queue. Note, this does not impact tasks that may already be in
 * progress. Also, if `strict_ordering` is true, this is ignored; the
 * buffer_size will be set to unlimited.
 *
 */

#ifndef vtkThreadedTaskQueue_h
#define vtkThreadedTaskQueue_h

#include "vtkObject.h"
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>

#if !defined(__WRAP__)
namespace vtkThreadedTaskQueueInternals
{
VTK_ABI_NAMESPACE_BEGIN
template <typename R>
class TaskQueue;

template <typename R>
class ResultQueue;
VTK_ABI_NAMESPACE_END
}

VTK_ABI_NAMESPACE_BEGIN

template <typename R, typename... Args>
class vtkThreadedTaskQueue
{
public:
  vtkThreadedTaskQueue(std::function<R(Args...)> worker, bool strict_ordering = true,
    int buffer_size = -1, int max_concurrent_tasks = -1);
  ~vtkThreadedTaskQueue();

  /**
   * Push arguments for the work
   */
  void Push(Args&&... args);

  /**
   * Pop the last result. Returns true on success. May fail if called on an
   * empty queue. This will wait for result to be available.
   */
  bool Pop(R& result);

  /**
   * Attempt to pop without waiting. If not results are available, returns
   * false.
   */
  bool TryPop(R& result);

  /**
   * Returns false if there's some result that may be popped right now or in the
   * future.
   */
  bool IsEmpty() const;

  /**
   * Blocks till the queue becomes empty.
   */
  void Flush();

private:
  vtkThreadedTaskQueue(const vtkThreadedTaskQueue&) = delete;
  void operator=(const vtkThreadedTaskQueue&) = delete;

  std::function<R(Args...)> Worker;

  std::unique_ptr<vtkThreadedTaskQueueInternals::TaskQueue<R>> Tasks;
  std::unique_ptr<vtkThreadedTaskQueueInternals::ResultQueue<R>> Results;

  int NumberOfThreads;
  std::unique_ptr<std::thread[]> Threads;
};

template <typename... Args>
class vtkThreadedTaskQueue<void, Args...>
{
public:
  vtkThreadedTaskQueue(std::function<void(Args...)> worker, bool strict_ordering = true,
    int buffer_size = -1, int max_concurrent_tasks = -1);
  ~vtkThreadedTaskQueue();

  /**
   * Push arguments for the work
   */
  void Push(Args&&... args);

  /**
   * Returns false if there's some result that may be popped right now or in the
   * future.
   */
  bool IsEmpty() const;

  /**
   * Blocks till the queue becomes empty.
   */
  void Flush();

private:
  vtkThreadedTaskQueue(const vtkThreadedTaskQueue&) = delete;
  void operator=(const vtkThreadedTaskQueue&) = delete;

  std::function<void(Args...)> Worker;

  std::unique_ptr<vtkThreadedTaskQueueInternals::TaskQueue<void>> Tasks;

  std::condition_variable ResultsCV;
  std::mutex NextResultIdMutex;
  std::atomic<std::uint64_t> NextResultId;

  int NumberOfThreads;
  std::unique_ptr<std::thread[]> Threads;
};

VTK_ABI_NAMESPACE_END
#include "vtkThreadedTaskQueue.txx"

#endif // !defined(__WRAP__)

#endif
// VTK-HeaderTest-Exclude: vtkThreadedTaskQueue.h