1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
|
// Copyright (c) the JPEG XL Project Authors. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
#include "lib/threads/thread_parallel_runner_internal.h"
#include <jxl/parallel_runner.h>
#include <jxl/types.h>
#include <algorithm>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <mutex>
#include <thread>
#include "lib/jxl/base/compiler_specific.h"
namespace jpegxl {
// static
JxlParallelRetCode ThreadParallelRunner::Runner(
void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init,
JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) {
ThreadParallelRunner* self =
static_cast<ThreadParallelRunner*>(runner_opaque);
if (start_range > end_range) return JXL_PARALLEL_RET_RUNNER_ERROR;
if (start_range == end_range) return JXL_PARALLEL_RET_SUCCESS;
int ret = init(jpegxl_opaque, std::max<size_t>(self->num_worker_threads_, 1));
if (ret != JXL_PARALLEL_RET_SUCCESS) return ret;
// Use a sequential run when num_worker_threads_ is zero since we have no
// worker threads.
if (self->num_worker_threads_ == 0) {
const size_t thread = 0;
for (uint32_t task = start_range; task < end_range; ++task) {
func(jpegxl_opaque, task, thread);
}
return JXL_PARALLEL_RET_SUCCESS;
}
if (self->depth_.fetch_add(1, std::memory_order_acq_rel) != 0) {
return JXL_PARALLEL_RET_RUNNER_ERROR; // Must not re-enter.
}
const WorkerCommand worker_command =
(static_cast<WorkerCommand>(start_range) << 32) + end_range;
// Ensure the inputs do not result in a reserved command.
if ((worker_command == kWorkerWait) || (worker_command == kWorkerOnce) ||
(worker_command == kWorkerExit)) {
return JXL_PARALLEL_RET_RUNNER_ERROR;
}
self->data_func_ = func;
self->jpegxl_opaque_ = jpegxl_opaque;
self->num_reserved_.store(0, std::memory_order_relaxed);
self->StartWorkers(worker_command);
self->WorkersReadyBarrier();
if (self->depth_.fetch_add(-1, std::memory_order_acq_rel) != 1) {
return JXL_PARALLEL_RET_RUNNER_ERROR;
}
return JXL_PARALLEL_RET_SUCCESS;
}
// static
void ThreadParallelRunner::RunRange(ThreadParallelRunner* self,
const WorkerCommand command,
const int thread) {
const uint32_t begin = command >> 32;
const uint32_t end = command & 0xFFFFFFFF;
const uint32_t num_tasks = end - begin;
const uint32_t num_worker_threads = self->num_worker_threads_;
// OpenMP introduced several "schedule" strategies:
// "single" (static assignment of exactly one chunk per thread): slower.
// "dynamic" (allocates k tasks at a time): competitive for well-chosen k.
// "guided" (allocates k tasks, decreases k): computing k = remaining/n
// is faster than halving k each iteration. We prefer this strategy
// because it avoids user-specified parameters.
for (;;) {
#if JXL_FALSE
// dynamic
const uint32_t my_size = std::max(num_tasks / (num_worker_threads * 4), 1);
#else
// guided
const uint32_t num_reserved =
self->num_reserved_.load(std::memory_order_relaxed);
// It is possible that more tasks are reserved than ready to run.
const uint32_t num_remaining =
num_tasks - std::min(num_reserved, num_tasks);
const uint32_t my_size =
std::max(num_remaining / (num_worker_threads * 4), 1u);
#endif
const uint32_t my_begin = begin + self->num_reserved_.fetch_add(
my_size, std::memory_order_relaxed);
const uint32_t my_end = std::min(my_begin + my_size, begin + num_tasks);
// Another thread already reserved the last task.
if (my_begin >= my_end) {
break;
}
for (uint32_t task = my_begin; task < my_end; ++task) {
self->data_func_(self->jpegxl_opaque_, task, thread);
}
}
}
// static
void ThreadParallelRunner::ThreadFunc(ThreadParallelRunner* self,
const int thread) {
// Until kWorkerExit command received:
for (;;) {
std::unique_lock<std::mutex> lock(self->mutex_);
// Notify main thread that this thread is ready.
if (++self->workers_ready_ == self->num_threads_) {
self->workers_ready_cv_.notify_one();
}
RESUME_WAIT:
// Wait for a command.
self->worker_start_cv_.wait(lock);
const WorkerCommand command = self->worker_start_command_;
switch (command) {
case kWorkerWait: // spurious wakeup:
goto RESUME_WAIT; // lock still held, avoid incrementing ready.
case kWorkerOnce:
lock.unlock();
self->data_func_(self->jpegxl_opaque_, thread, thread);
break;
case kWorkerExit:
return; // exits thread
default:
lock.unlock();
RunRange(self, command, thread);
break;
}
}
}
ThreadParallelRunner::ThreadParallelRunner(const int num_worker_threads)
: num_worker_threads_(num_worker_threads),
num_threads_(std::max(num_worker_threads, 1)) {
threads_.reserve(num_worker_threads_);
// Suppress "unused-private-field" warning.
(void)padding1;
(void)padding2;
// Safely handle spurious worker wakeups.
worker_start_command_ = kWorkerWait;
for (uint32_t i = 0; i < num_worker_threads_; ++i) {
threads_.emplace_back(ThreadFunc, this, i);
}
if (num_worker_threads_ != 0) {
WorkersReadyBarrier();
}
}
ThreadParallelRunner::~ThreadParallelRunner() {
if (num_worker_threads_ != 0) {
StartWorkers(kWorkerExit);
}
for (std::thread& thread : threads_) {
if (thread.joinable()) {
thread.join();
} else {
#if JXL_IS_DEBUG_BUILD
JXL_PRINT_STACK_TRACE();
JXL_CRASH();
#endif
}
}
}
} // namespace jpegxl
|