File: vtkThreadedTaskQueue.h

package info (click to toggle)
paraview 5.9.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 367,928 kB
  • sloc: cpp: 2,870,477; ansic: 1,329,317; python: 132,607; xml: 98,045; sh: 5,265; java: 4,541; yacc: 4,385; f90: 3,099; perl: 2,363; lex: 1,950; javascript: 1,574; objc: 143; makefile: 135; tcl: 59; pascal: 50; fortran: 27
file content (167 lines) | stat: -rw-r--r-- 5,081 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
/*=========================================================================

  Program:   Visualization Toolkit
  Module:    vtkThreadedTaskQueue.h

  Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
  All rights reserved.
  See Copyright.txt or http://www.kitware.com/Copyright.htm for details.

     This software is distributed WITHOUT ANY WARRANTY; without even
     the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
     PURPOSE.  See the above copyright notice for more information.

=========================================================================*/
/**
 * @class vtkThreadedTaskQueue
 * @brief simple threaded task queue
 *
 * vtkThreadedTaskQueue provides a simple task queue that can use threads to
 * execute individual tasks. It is intended for use applications such as data
 * compression, encoding etc. where the task may be completed concurrently
 * without blocking the main thread.
 *
 * vtkThreadedTaskQueue's API is intended to called from the same main thread.
 * The constructor defines the work (or task) to be performed. `Push` allows the
 * caller to enqueue a task with specified input arguments. The call will return
 * immediately without blocking. The task is enqueued and will be executed
 * concurrently when resources become available.  `Pop` will block until the
 * result is available. To avoid waiting for results to be available, use
 * `TryPop`.
 *
 * The constructor allows mechanism to customize the queue. `strict_ordering`
 * implies that results should be popped in the same order that tasks were
 * pushed without dropping any task. If the caller is only concerned with
 * obtaining the latest available result where intermediate results that take
 * longer to compute may be dropped, then `strict_ordering` can be set to `false`.
 *
 * `max_concurrent_tasks` controls how many threads are used to process tasks in
 * the queue. Default is same as
 * `vtkMultiThreader::GetGlobalDefaultNumberOfThreads()`.
 *
 * `buffer_size` indicates how many tasks may be queued for processing. Default
 * is infinite size. If a positive number is provided, then pushing additional
 * tasks will result in discarding of older tasks that haven't begun processing
 * from the queue. Note, this does not impact tasks that may already be in
 * progress. Also, if `strict_ordering` is true, this is ignored; the
 * buffer_size will be set to unlimited.
 *
 */

#ifndef vtkThreadedTaskQueue_h
#define vtkThreadedTaskQueue_h

#include "vtkObject.h"
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>

#if !defined(__WRAP__)
namespace vtkThreadedTaskQueueInternals
{
template <typename R>
class TaskQueue;

template <typename R>
class ResultQueue;
};

template <typename R, typename... Args>
class vtkThreadedTaskQueue
{
public:
  vtkThreadedTaskQueue(std::function<R(Args...)> worker, bool strict_ordering = true,
    int buffer_size = -1, int max_concurrent_tasks = -1);
  ~vtkThreadedTaskQueue();

  /**
   * Push arguments for the work
   */
  void Push(Args&&... args);

  /**
   * Pop the last result. Returns true on success. May fail if called on an
   * empty queue. This will wait for result to be available.
   */
  bool Pop(R& result);

  /**
   * Attempt to pop without waiting. If not results are available, returns
   * false.
   */
  bool TryPop(R& result);

  /**
   * Returns false if there's some result that may be popped right now or in the
   * future.
   */
  bool IsEmpty() const;

  /**
   * Blocks till the queue becomes empty.
   */
  void Flush();

private:
  vtkThreadedTaskQueue(const vtkThreadedTaskQueue&) = delete;
  void operator=(const vtkThreadedTaskQueue&) = delete;

  std::function<R(Args...)> Worker;

  std::unique_ptr<vtkThreadedTaskQueueInternals::TaskQueue<R>> Tasks;
  std::unique_ptr<vtkThreadedTaskQueueInternals::ResultQueue<R>> Results;

  int NumberOfThreads;
  std::unique_ptr<std::thread[]> Threads;
};

template <typename... Args>
class vtkThreadedTaskQueue<void, Args...>
{
public:
  vtkThreadedTaskQueue(std::function<void(Args...)> worker, bool strict_ordering = true,
    int buffer_size = -1, int max_concurrent_tasks = -1);
  ~vtkThreadedTaskQueue();

  /**
   * Push arguments for the work
   */
  void Push(Args&&... args);

  /**
   * Returns false if there's some result that may be popped right now or in the
   * future.
   */
  bool IsEmpty() const;

  /**
   * Blocks till the queue becomes empty.
   */
  void Flush();

private:
  vtkThreadedTaskQueue(const vtkThreadedTaskQueue&) = delete;
  void operator=(const vtkThreadedTaskQueue&) = delete;

  std::function<void(Args...)> Worker;

  std::unique_ptr<vtkThreadedTaskQueueInternals::TaskQueue<void>> Tasks;

  std::condition_variable ResultsCV;
  std::mutex NextResultIdMutex;
  std::atomic<std::uint64_t> NextResultId;

  int NumberOfThreads;
  std::unique_ptr<std::thread[]> Threads;
};

#include "vtkThreadedTaskQueue.txx"

#endif // !defined(__WRAP__)

#endif
// VTK-HeaderTest-Exclude: vtkThreadedTaskQueue.h