File: vtkSMPThreadPool.cxx

package info (click to toggle)
vtk9 9.3.0%2Bdfsg1-4
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 267,116 kB
  • sloc: cpp: 2,195,914; ansic: 285,452; python: 104,858; sh: 4,061; yacc: 4,035; java: 3,977; xml: 2,771; perl: 2,189; lex: 1,762; objc: 153; makefile: 150; javascript: 90; tcl: 59
file content (429 lines) | stat: -rw-r--r-- 12,837 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
// SPDX-FileCopyrightText: Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
// SPDX-License-Identifier: BSD-3-Clause

#include "SMP/STDThread/vtkSMPThreadPool.h"

#include <vtkObject.h>

#include <algorithm>
#include <cassert>
#include <condition_variable>
#include <future>
#include <iostream>

namespace vtk
{
namespace detail
{
namespace smp
{
VTK_ABI_NAMESPACE_BEGIN

static constexpr std::size_t NoRunningJob = (std::numeric_limits<std::size_t>::max)();

struct vtkSMPThreadPool::ThreadJob
{
  // This construtor is needed because aggregate initialization can not have default value
  // (prior to C++14)
  // also because emplace_back can not use aggregate initialization (prior to C++20)
  ThreadJob(ProxyData* proxy = nullptr, std::function<void()> function = nullptr)
    : Proxy{ proxy }
    , Function{ std::move(function) }
  {
  }

  ProxyData* Proxy{};               // Proxy that allocated this job
  std::function<void()> Function{}; // Actual user job
  std::promise<void> Promise{};     // Set when job is done
};

struct vtkSMPThreadPool::ThreadData
{
  // stack of jobs, any thread can push, and only push, jobs (and Mutex must be locked)
  std::vector<ThreadJob> Jobs{};
  // Current job (used to map thread to Proxy), using an index is okay as only this thread can
  // erase the job and other threads can only push back new jobs not insert. This constaint could
  // be relaxed by using unique ids instead.
  std::size_t RunningJob{ NoRunningJob };
  std::thread SystemThread{};                  // the system thread, not really used
  std::mutex Mutex{};                          // thread mutex, used for Jobs manipulation
  std::condition_variable ConditionVariable{}; // thread cv, used to wake up the thread
};

struct vtkSMPThreadPool::ProxyThreadData
{
  // This construtor is needed because aggregate initialization can not have default value
  // (prior to C++14)
  // also because emplace_back can not use aggregate initialization (prior to C++20)
  ProxyThreadData(ThreadData* threadData = nullptr, std::size_t id = 0)
    : Thread{ threadData }
    , Id{ id }
  {
  }

  ThreadData* Thread{}; // The thread data from the pool
  std::size_t Id{};     // Virtual thread ID, mainly used for thread local variables
};

struct vtkSMPThreadPool::ProxyData
{
  vtkSMPThreadPool* Pool{};                     // Pool that created this proxy
  ProxyData* Parent{};                          // either null (for top level) or the parent
  std::vector<ProxyThreadData> Threads{};       // Threads used by this
  std::size_t NextThread{};                     // Round-robin thread for jobs
  std::vector<std::future<void>> JobsFutures{}; // Used to know when job is done
  std::mutex Mutex{};                           // Used to synchronize
};

void vtkSMPThreadPool::RunJob(
  ThreadData& data, std::size_t jobIndex, std::unique_lock<std::mutex>& lock)
{
  assert(lock.owns_lock() && "Caller must have locked mutex");
  assert(jobIndex < data.Jobs.size() && "jobIndex out of range");

  const auto oldRunningJob = data.RunningJob; // store old running job for nested threads
  data.RunningJob = jobIndex;                 // Set thread running job
  auto function = std::move(data.Jobs[data.RunningJob].Function);
  lock.unlock(); // MSVC: warning C26110 is a false positive

  try
  {
    function(); // run the function
  }
  catch (const std::exception& e)
  {
    vtkErrorWithObjectMacro(nullptr,
      "Function called by " << vtkSMPThreadPool::GetInstance().GetThreadId()
                            << " has thrown an exception. The exception is ignored. what():\n"
                            << e.what());
  }
  catch (...)
  {
    vtkErrorWithObjectMacro(nullptr,
      "Function called by " << vtkSMPThreadPool::GetInstance().GetThreadId()
                            << " has thrown an unknown exception. The exception is ignored.");
  }

  lock.lock();
  data.Jobs[data.RunningJob].Promise.set_value();
  data.Jobs.erase(data.Jobs.begin() + jobIndex);
  data.RunningJob = oldRunningJob;
}

vtkSMPThreadPool::Proxy::Proxy(std::unique_ptr<ProxyData>&& data)
  : Data{ std::move(data) }
{
}

vtkSMPThreadPool::Proxy::~Proxy()
{
  if (!this->Data->JobsFutures.empty())
  {
    vtkErrorWithObjectMacro(nullptr, "Proxy not joined. Terminating.");
    std::terminate();
  }
}

vtkSMPThreadPool::Proxy::Proxy(Proxy&&) noexcept = default;
vtkSMPThreadPool::Proxy& vtkSMPThreadPool::Proxy::operator=(Proxy&&) noexcept = default;

void vtkSMPThreadPool::Proxy::Join()
{
  if (this->IsTopLevel()) // wait for all futures, all jobs are done by other threads
  {
    for (auto& future : this->Data->JobsFutures)
    {
      future.wait();
    }
  }
  else // nested run code in calling thread too
  {
    // Run jobs associated with this thread and proxy
    ThreadData& threadData = *this->Data->Threads[0].Thread;
    assert(threadData.SystemThread.get_id() == std::this_thread::get_id());

    while (true)
    {
      // protect access in case other thread push work for current thread
      std::unique_lock<std::mutex> lock{ threadData.Mutex };

      auto it = std::find_if(threadData.Jobs.begin(), threadData.Jobs.end(),
        [this](ThreadJob& job) { return job.Proxy == this->Data.get(); });

      if (it == threadData.Jobs.end()) // no remaining job associated to this proxy
      {
        break;
      }

      const auto jobIndex = static_cast<std::size_t>(std::distance(threadData.Jobs.begin(), it));
      RunJob(threadData, jobIndex, lock);
    }

    for (auto& future : this->Data->JobsFutures)
    {
      future.wait();
    }
  }

  this->Data->JobsFutures.clear();
}

void vtkSMPThreadPool::Proxy::DoJob(std::function<void()> job)
{
  this->Data->NextThread = (this->Data->NextThread + 1) % this->Data->Threads.size();
  auto& proxyThread = this->Data->Threads[this->Data->NextThread];

  if (!this->IsTopLevel() && this->Data->NextThread == 0) // when nested, thread 0 is "this_thread"
  {
    assert(std::this_thread::get_id() == proxyThread.Thread->SystemThread.get_id());

    std::unique_lock<std::mutex> lock{ proxyThread.Thread->Mutex };
    proxyThread.Thread->Jobs.emplace_back(this->Data.get(), std::move(job));
  }
  else
  {
    std::unique_lock<std::mutex> lock{ proxyThread.Thread->Mutex };

    auto& jobs = proxyThread.Thread->Jobs;
    jobs.emplace_back(this->Data.get(), std::move(job));
    this->Data->JobsFutures.emplace_back(jobs.back().Promise.get_future());

    lock.unlock();

    proxyThread.Thread->ConditionVariable.notify_one();
  }
}

std::vector<std::reference_wrapper<std::thread>> vtkSMPThreadPool::Proxy::GetThreads() const
{
  std::vector<std::reference_wrapper<std::thread>> output;

  for (auto& proxyThread : this->Data->Threads)
  {
    output.emplace_back(proxyThread.Thread->SystemThread);
  }

  return output;
}

bool vtkSMPThreadPool::Proxy::IsTopLevel() const noexcept
{
  return this->Data->Parent == nullptr;
}

vtkSMPThreadPool::vtkSMPThreadPool()
{
  const auto threadCount = static_cast<std::size_t>(std::thread::hardware_concurrency());

  this->Threads.reserve(threadCount);
  for (std::size_t i{}; i < threadCount; ++i)
  {
    std::unique_ptr<ThreadData> data{ new ThreadData{} };
    data->SystemThread = this->MakeThread();
    this->Threads.emplace_back(std::move(data));
  }

  this->Initialized.store(true, std::memory_order_release);
}

vtkSMPThreadPool::~vtkSMPThreadPool()
{
  this->Joining.store(true, std::memory_order_release);

  for (auto& threadData : this->Threads)
  {
    threadData->ConditionVariable.notify_one();
  }

  for (auto& threadData : this->Threads)
  {
    threadData->SystemThread.join();
  }
}

vtkSMPThreadPool::Proxy vtkSMPThreadPool::AllocateThreads(std::size_t threadCount)
{
  if (threadCount == 0 || threadCount > this->ThreadCount())
  {
    threadCount = this->ThreadCount();
  }

  std::unique_ptr<ProxyData> proxy{ new ProxyData{} };
  proxy->Pool = this;
  proxy->Threads.reserve(threadCount);

  // Check if we are in the pool
  ThreadData* threadData = this->GetCallerThreadData();
  if (threadData)
  {
    // Don't lock since we are in the running job, in this thread
    proxy->Parent = threadData->Jobs[threadData->RunningJob].Proxy;
    // First thread is always current thread
    proxy->Threads.emplace_back(threadData, this->GetNextThreadId());
    this->FillThreadsForNestedProxy(proxy.get(), threadCount);
  }
  else
  {
    proxy->Parent = nullptr;
    for (std::size_t i{}; i < threadCount; ++i)
    {
      proxy->Threads.emplace_back(this->Threads[i].get(), this->GetNextThreadId());
    }
  }

  return Proxy{ std::move(proxy) };
}

std::size_t vtkSMPThreadPool::GetThreadId() const noexcept
{
  auto* threadData = this->GetCallerThreadData();

  if (threadData)
  {
    std::unique_lock<std::mutex> lock{ threadData->Mutex }; // protect threadData->Jobs access
    assert(threadData->RunningJob != NoRunningJob && "Invalid state");
    auto& proxyThreads = threadData->Jobs[threadData->RunningJob].Proxy->Threads;
    lock.unlock();

    for (const auto& proxyThread : proxyThreads)
    {
      if (proxyThread.Thread == threadData)
      {
        return proxyThread.Id;
      }
    }
  }

  // Use 1 for any thread outside the pool and 2+ for ids of proxy thread because thread local
  // implementation uses ID "0" for invalid state
  return ExternalThreadID;
}

bool vtkSMPThreadPool::IsParallelScope() const noexcept
{
  return GetCallerThreadData() != nullptr;
}

bool vtkSMPThreadPool::GetSingleThread() const
{
  // Return true if the caller is the thread[0] of the current running proxy

  auto* threadData = GetCallerThreadData();
  if (threadData)
  {
    std::lock_guard<std::mutex> lock{ threadData->Mutex };
    assert(threadData->RunningJob != NoRunningJob && "Invalid state");
    return threadData->Jobs[threadData->RunningJob].Proxy->Threads[0].Thread == threadData;
  }

  return false;
}

std::size_t vtkSMPThreadPool::ThreadCount() const noexcept
{
  return this->Threads.size();
}

vtkSMPThreadPool::ThreadData* vtkSMPThreadPool::GetCallerThreadData() const noexcept
{
  for (const auto& threadData : this->Threads)
  {
    if (threadData->SystemThread.get_id() == std::this_thread::get_id())
    {
      return threadData.get();
    }
  }

  return nullptr;
}

std::thread vtkSMPThreadPool::MakeThread()
{
  return std::thread{ [this]() {
    while (!this->Initialized.load(std::memory_order_acquire))
    {
    }

    ThreadData& threadData = *this->GetCallerThreadData();

    // Main loop for threads of the pool
    // When they are woke up, they check for new job and stop if "this->Joining" is true
    // and no more jobs are running
    while (true)
    {
      std::unique_lock<std::mutex> lock{ threadData.Mutex };

      // Job stealing could be implemented but it will requires some changes in the process
      // A thread that as no longer work to do could look at other threads jobs to "steal" a job
      // from them and thus increase parallelism. This must take care of not generating deadlocks
      // and should not increase Proxy parallelism above requested thread count.
      // This goes out of the scope of current implementation.
      threadData.ConditionVariable.wait(lock, [this, &threadData] {
        return !threadData.Jobs.empty() || this->Joining.load(std::memory_order_acquire);
      });

      if (threadData.Jobs.empty())
      {
        break; // joining
      }

      RunJob(threadData, threadData.Jobs.size() - 1, lock);
    }
  } };
}

void vtkSMPThreadPool::FillThreadsForNestedProxy(ProxyData* proxy, std::size_t maxCount)
{
  // This function assigns thread for proxies, this function assumes that the calling thread is
  // already part of the assigned thread for the proxy.
  // Otherwise it will assign thread pool threads that are not already used by any of proxy parents

  if (proxy->Parent->Threads.size() == this->Threads.size())
  {
    return; // No thread will be available
  }

  const auto isFree = [proxy](ThreadData* threadData) {
    for (auto* parent = proxy->Parent; parent != nullptr; parent = parent->Parent)
    {
      for (auto& proxyThread : parent->Threads)
      {
        if (proxyThread.Thread == threadData)
        {
          return false;
        }
      }
    }

    return true;
  };

  for (auto& threadData : this->Threads)
  {
    if (isFree(threadData.get()))
    {
      proxy->Threads.emplace_back(threadData.get(), this->GetNextThreadId());
    }

    if (proxy->Threads.size() == maxCount)
    {
      break;
    }
  }
}

std::size_t vtkSMPThreadPool::GetNextThreadId() noexcept
{
  return this->NextProxyThreadId.fetch_add(1, std::memory_order_relaxed) + 1;
}

vtkSMPThreadPool& vtkSMPThreadPool::GetInstance()
{
  static vtkSMPThreadPool instance{};
  return instance;
}

VTK_ABI_NAMESPACE_END
} // namespace smp
} // namespace detail
} // namespace vtk