1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
|
// Copyright 2019 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/thread_pool/job_task_source.h"
#include <bit>
#include <type_traits>
#include <utility>
#include "base/check_op.h"
#include "base/functional/bind.h"
#include "base/functional/callback_helpers.h"
#include "base/memory/ptr_util.h"
#include "base/notreached.h"
#include "base/task/common/checked_lock.h"
#include "base/task/task_features.h"
#include "base/task/thread_pool/pooled_task_runner_delegate.h"
#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "base/time/time_override.h"
#include "base/trace_event/trace_event.h"
namespace base::internal {
namespace {
// Capped to allow assigning task_ids from a bitfield.
constexpr size_t kMaxWorkersPerJob = 32;
static_assert(
kMaxWorkersPerJob <=
std::numeric_limits<
std::invoke_result<decltype(&JobDelegate::GetTaskId),
JobDelegate>::type>::max(),
"AcquireTaskId return type isn't big enough to fit kMaxWorkersPerJob");
} // namespace
JobTaskSource::State::State() = default;
JobTaskSource::State::~State() = default;
JobTaskSource::State::Value JobTaskSource::State::Cancel() {
return {value_.fetch_or(kCanceledMask, std::memory_order_relaxed)};
}
JobTaskSource::State::Value JobTaskSource::State::DecrementWorkerCount() {
const uint32_t value_before_sub =
value_.fetch_sub(kWorkerCountIncrement, std::memory_order_relaxed);
DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
return {value_before_sub};
}
JobTaskSource::State::Value JobTaskSource::State::IncrementWorkerCount() {
uint32_t value_before_add =
value_.fetch_add(kWorkerCountIncrement, std::memory_order_relaxed);
// The worker count must not overflow a uint8_t.
DCHECK((value_before_add >> kWorkerCountBitOffset) < ((1 << 8) - 1));
return {value_before_add};
}
JobTaskSource::State::Value JobTaskSource::State::Load() const {
return {value_.load(std::memory_order_relaxed)};
}
JobTaskSource::JoinFlag::JoinFlag() = default;
JobTaskSource::JoinFlag::~JoinFlag() = default;
void JobTaskSource::JoinFlag::Reset() {
value_.store(kNotWaiting, std::memory_order_relaxed);
}
void JobTaskSource::JoinFlag::SetWaiting() {
value_.store(kWaitingForWorkerToYield, std::memory_order_relaxed);
}
bool JobTaskSource::JoinFlag::ShouldWorkerYield() {
// The fetch_and() sets the state to kWaitingForWorkerToSignal if it was
// previously kWaitingForWorkerToYield, otherwise it leaves it unchanged.
return value_.fetch_and(kWaitingForWorkerToSignal,
std::memory_order_relaxed) ==
kWaitingForWorkerToYield;
}
bool JobTaskSource::JoinFlag::ShouldWorkerSignal() {
return value_.exchange(kNotWaiting, std::memory_order_relaxed) != kNotWaiting;
}
JobTaskSource::JobTaskSource(const Location& from_here,
const TaskTraits& traits,
RepeatingCallback<void(JobDelegate*)> worker_task,
MaxConcurrencyCallback max_concurrency_callback,
PooledTaskRunnerDelegate* delegate)
: TaskSource(traits, TaskSourceExecutionMode::kJob),
max_concurrency_callback_(std::move(max_concurrency_callback)),
worker_task_(std::move(worker_task)),
primary_task_(base::BindRepeating(
[](JobTaskSource* self) {
CheckedLock::AssertNoLockHeldOnCurrentThread();
// Each worker task has its own delegate with associated state.
JobDelegate job_delegate{self, self->delegate_};
self->worker_task_.Run(&job_delegate);
},
base::Unretained(this))),
task_metadata_(from_here),
ready_time_(TimeTicks::Now()),
delegate_(delegate) {
DCHECK(delegate_);
task_metadata_.sequence_num = -1;
}
JobTaskSource::~JobTaskSource() {
// Make sure there's no outstanding active run operation left.
DCHECK_EQ(state_.Load().worker_count(), 0U);
}
ExecutionEnvironment JobTaskSource::GetExecutionEnvironment() {
return {SequenceToken::Create()};
}
void JobTaskSource::WillEnqueue(int sequence_num, TaskAnnotator& annotator) {
if (task_metadata_.sequence_num != -1) {
// WillEnqueue() was already called.
return;
}
task_metadata_.sequence_num = sequence_num;
annotator.WillQueueTask("ThreadPool_PostJob", &task_metadata_);
}
bool JobTaskSource::WillJoin() {
TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
CheckedAutoLock auto_lock(worker_lock_);
DCHECK(!worker_released_condition_); // This may only be called once.
worker_lock_.CreateConditionVariableAndEmplace(worker_released_condition_);
// Prevent wait from triggering a ScopedBlockingCall as this would cause
// |ThreadGroup::lock_| to be acquired, causing lock inversion.
worker_released_condition_->declare_only_used_while_idle();
const auto state_before_add = state_.IncrementWorkerCount();
if (!state_before_add.is_canceled() &&
state_before_add.worker_count() <
GetMaxConcurrency(state_before_add.worker_count())) {
return true;
}
return WaitForParticipationOpportunity();
}
bool JobTaskSource::RunJoinTask() {
JobDelegate job_delegate{this, nullptr};
worker_task_.Run(&job_delegate);
// It is safe to read |state_| without a lock since this variable is atomic
// and the call to GetMaxConcurrency() is used for a best effort early exit.
// Stale values will only cause WaitForParticipationOpportunity() to be
// called.
const auto state = TS_UNCHECKED_READ(state_).Load();
// The condition is slightly different from the one in WillJoin() since we're
// using |state| that was already incremented to include the joining thread.
if (!state.is_canceled() &&
state.worker_count() <= GetMaxConcurrency(state.worker_count() - 1)) {
return true;
}
TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
CheckedAutoLock auto_lock(worker_lock_);
return WaitForParticipationOpportunity();
}
void JobTaskSource::Cancel(TaskSource::Transaction* transaction) {
// Sets the kCanceledMask bit on |state_| so that further calls to
// WillRunTask() never succeed. std::memory_order_relaxed without a lock is
// safe because this task source never needs to be re-enqueued after Cancel().
TS_UNCHECKED_READ(state_).Cancel();
}
// EXCLUSIVE_LOCK_REQUIRED(worker_lock_)
bool JobTaskSource::WaitForParticipationOpportunity() {
DCHECK(!join_flag_.IsWaiting());
// std::memory_order_relaxed is sufficient because no other state is
// synchronized with |state_| outside of |lock_|.
auto state = state_.Load();
// |worker_count - 1| to exclude the joining thread which is not active.
size_t max_concurrency = GetMaxConcurrency(state.worker_count() - 1);
// Wait until either:
// A) |worker_count| is below or equal to max concurrency and state is not
// canceled.
// B) All other workers returned and |worker_count| is 1.
while (!((state.worker_count() <= max_concurrency && !state.is_canceled()) ||
state.worker_count() == 1)) {
// std::memory_order_relaxed is sufficient because no other state is
// synchronized with |join_flag_| outside of |lock_|.
join_flag_.SetWaiting();
// To avoid unnecessarily waiting, if either condition A) or B) change
// |lock_| is taken and |worker_released_condition_| signaled if necessary:
// 1- In DidProcessTask(), after worker count is decremented.
// 2- In NotifyConcurrencyIncrease(), following a max_concurrency increase.
worker_released_condition_->Wait();
state = state_.Load();
// |worker_count - 1| to exclude the joining thread which is not active.
max_concurrency = GetMaxConcurrency(state.worker_count() - 1);
}
// It's possible though unlikely that the joining thread got a participation
// opportunity without a worker signaling.
join_flag_.Reset();
// Case A:
if (state.worker_count() <= max_concurrency && !state.is_canceled()) {
return true;
}
// Case B:
// Only the joining thread remains.
DCHECK_EQ(state.worker_count(), 1U);
DCHECK(state.is_canceled() || max_concurrency == 0U);
state_.DecrementWorkerCount();
// Prevent subsequent accesses to user callbacks.
state_.Cancel();
return false;
}
TaskSource::RunStatus JobTaskSource::WillRunTask() {
CheckedAutoLock auto_lock(worker_lock_);
auto state_before_add = state_.Load();
// Don't allow this worker to run the task if either:
// A) |state_| was canceled.
// B) |worker_count| is already at |max_concurrency|.
// C) |max_concurrency| was lowered below or to |worker_count|.
// Case A:
if (state_before_add.is_canceled()) {
return RunStatus::kDisallowed;
}
const size_t max_concurrency =
GetMaxConcurrency(state_before_add.worker_count());
if (state_before_add.worker_count() < max_concurrency) {
state_before_add = state_.IncrementWorkerCount();
}
const size_t worker_count_before_add = state_before_add.worker_count();
// Case B) or C):
if (worker_count_before_add >= max_concurrency) {
return RunStatus::kDisallowed;
}
DCHECK_LT(worker_count_before_add, max_concurrency);
return max_concurrency == worker_count_before_add + 1
? RunStatus::kAllowedSaturated
: RunStatus::kAllowedNotSaturated;
}
size_t JobTaskSource::GetRemainingConcurrency() const {
// It is safe to read |state_| without a lock since this variable is atomic,
// and no other state is synchronized with GetRemainingConcurrency().
const auto state = TS_UNCHECKED_READ(state_).Load();
if (state.is_canceled()) {
return 0;
}
const size_t max_concurrency = GetMaxConcurrency(state.worker_count());
// Avoid underflows.
if (state.worker_count() > max_concurrency) {
return 0;
}
return max_concurrency - state.worker_count();
}
bool JobTaskSource::IsActive() const {
CheckedAutoLock auto_lock(worker_lock_);
auto state = state_.Load();
return GetMaxConcurrency(state.worker_count()) != 0 ||
state.worker_count() != 0;
}
size_t JobTaskSource::GetWorkerCount() const {
return TS_UNCHECKED_READ(state_).Load().worker_count();
}
void JobTaskSource::NotifyConcurrencyIncrease() {
// Avoid unnecessary locks when NotifyConcurrencyIncrease() is spuriously
// called.
if (GetRemainingConcurrency() == 0) {
return;
}
{
// Lock is taken to access |join_flag_| below and signal
// |worker_released_condition_|.
CheckedAutoLock auto_lock(worker_lock_);
if (join_flag_.ShouldWorkerSignal()) {
worker_released_condition_->Signal();
}
}
// Make sure the task source is in the queue if not already.
// Caveat: it's possible but unlikely that the task source has already reached
// its intended concurrency and doesn't need to be enqueued if there
// previously were too many worker. For simplicity, the task source is always
// enqueued and will get discarded if already saturated when it is popped from
// the priority queue.
delegate_->EnqueueJobTaskSource(this);
}
size_t JobTaskSource::GetMaxConcurrency() const {
return GetMaxConcurrency(TS_UNCHECKED_READ(state_).Load().worker_count());
}
size_t JobTaskSource::GetMaxConcurrency(size_t worker_count) const {
return std::min(max_concurrency_callback_.Run(worker_count),
kMaxWorkersPerJob);
}
uint8_t JobTaskSource::AcquireTaskId() {
static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8,
"TaskId bitfield isn't big enough to fit kMaxWorkersPerJob.");
uint32_t assigned_task_ids =
assigned_task_ids_.load(std::memory_order_relaxed);
uint32_t new_assigned_task_ids = 0;
int task_id = 0;
// memory_order_acquire on success, matched with memory_order_release in
// ReleaseTaskId() so that operations done by previous threads that had
// the same task_id become visible to the current thread.
do {
// Count trailing one bits. This is the id of the right-most 0-bit in
// |assigned_task_ids|.
task_id = std::countr_one(assigned_task_ids);
new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id);
} while (!assigned_task_ids_.compare_exchange_weak(
assigned_task_ids, new_assigned_task_ids, std::memory_order_acquire,
std::memory_order_relaxed));
return static_cast<uint8_t>(task_id);
}
void JobTaskSource::ReleaseTaskId(uint8_t task_id) {
// memory_order_release to match AcquireTaskId().
uint32_t previous_task_ids = assigned_task_ids_.fetch_and(
~(uint32_t(1) << task_id), std::memory_order_release);
DCHECK(previous_task_ids & (uint32_t(1) << task_id));
}
bool JobTaskSource::ShouldYield() {
// It is safe to read |join_flag_| and |state_| without a lock since these
// variables are atomic, keeping in mind that threads may not immediately see
// the new value when it is updated.
return TS_UNCHECKED_READ(join_flag_).ShouldWorkerYield() ||
TS_UNCHECKED_READ(state_).Load().is_canceled();
}
Task JobTaskSource::TakeTask(TaskSource::Transaction* transaction) {
// JobTaskSource members are not lock-protected so no need to acquire a lock
// if |transaction| is nullptr.
DCHECK_GT(TS_UNCHECKED_READ(state_).Load().worker_count(), 0U);
DCHECK(primary_task_);
return {task_metadata_, primary_task_};
}
bool JobTaskSource::DidProcessTask(TaskSource::Transaction* /*transaction*/) {
// Lock is needed to access |join_flag_| below and signal
// |worker_released_condition_|.
CheckedAutoLock auto_lock(worker_lock_);
const auto state_before_sub = state_.DecrementWorkerCount();
if (join_flag_.ShouldWorkerSignal()) {
worker_released_condition_->Signal();
}
// A canceled task source should never get re-enqueued.
if (state_before_sub.is_canceled()) {
return false;
}
DCHECK_GT(state_before_sub.worker_count(), 0U);
// Re-enqueue the TaskSource if the task ran and the worker count is below the
// max concurrency.
// |worker_count - 1| to exclude the returning thread.
return state_before_sub.worker_count() <=
GetMaxConcurrency(state_before_sub.worker_count() - 1);
}
// This is a no-op and should always return true.
bool JobTaskSource::WillReEnqueue(TimeTicks now,
TaskSource::Transaction* /*transaction*/) {
return true;
}
// This is a no-op.
bool JobTaskSource::OnBecomeReady() {
return false;
}
TaskSourceSortKey JobTaskSource::GetSortKey() const {
return TaskSourceSortKey(priority_racy(), ready_time_,
TS_UNCHECKED_READ(state_).Load().worker_count());
}
// This function isn't expected to be called since a job is never delayed.
// However, the class still needs to provide an override.
TimeTicks JobTaskSource::GetDelayedSortKey() const {
return TimeTicks();
}
// This function isn't expected to be called since a job is never delayed.
// However, the class still needs to provide an override.
bool JobTaskSource::HasReadyTasks(TimeTicks now) const {
NOTREACHED();
}
std::optional<Task> JobTaskSource::Clear(TaskSource::Transaction* transaction) {
Cancel();
// Nothing is cleared since other workers might still racily run tasks. For
// simplicity, the destructor will take care of it once all references are
// released.
return std::nullopt;
}
} // namespace base::internal
|