1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709
|
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/thread_pool/task_tracker.h"
#include <atomic>
#include <optional>
#include <string>
#include <utility>
#include "base/base_switches.h"
#include "base/command_line.h"
#include "base/compiler_specific.h"
#include "base/debug/alias.h"
#include "base/functional/callback.h"
#include "base/json/json_writer.h"
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/metrics/histogram_macros.h"
#include "base/notreached.h"
#include "base/sequence_token.h"
#include "base/strings/string_util.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/scoped_set_task_priority_for_current_thread.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/single_thread_task_runner.h"
#include "base/task/thread_pool/job_task_source.h"
#include "base/task/thread_pool/task_source.h"
#include "base/threading/sequence_local_storage_map.h"
#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "base/trace_event/trace_event.h"
#include "base/tracing/protos/chrome_track_event.pbzero.h"
#include "base/values.h"
#include "build/build_config.h"
namespace base::internal {
namespace {
using perfetto::protos::pbzero::ChromeThreadPoolTask;
using perfetto::protos::pbzero::ChromeTrackEvent;
constexpr const char* kExecutionModeString[] = {"parallel", "sequenced",
"single thread", "job"};
static_assert(
std::size(kExecutionModeString) ==
static_cast<size_t>(TaskSourceExecutionMode::kMax) + 1,
"Array kExecutionModeString is out of sync with TaskSourceExecutionMode.");
bool HasLogBestEffortTasksSwitch() {
// The CommandLine might not be initialized if ThreadPool is initialized in a
// dynamic library which doesn't have access to argc/argv.
return CommandLine::InitializedForCurrentProcess() &&
CommandLine::ForCurrentProcess()->HasSwitch(
switches::kLogBestEffortTasks);
}
ChromeThreadPoolTask::Priority TaskPriorityToProto(TaskPriority priority) {
switch (priority) {
case TaskPriority::BEST_EFFORT:
return ChromeThreadPoolTask::PRIORITY_BEST_EFFORT;
case TaskPriority::USER_VISIBLE:
return ChromeThreadPoolTask::PRIORITY_USER_VISIBLE;
case TaskPriority::USER_BLOCKING:
return ChromeThreadPoolTask::PRIORITY_USER_BLOCKING;
}
}
ChromeThreadPoolTask::ExecutionMode ExecutionModeToProto(
TaskSourceExecutionMode mode) {
switch (mode) {
case TaskSourceExecutionMode::kParallel:
return ChromeThreadPoolTask::EXECUTION_MODE_PARALLEL;
case TaskSourceExecutionMode::kSequenced:
return ChromeThreadPoolTask::EXECUTION_MODE_SEQUENCED;
case TaskSourceExecutionMode::kSingleThread:
return ChromeThreadPoolTask::EXECUTION_MODE_SINGLE_THREAD;
case TaskSourceExecutionMode::kJob:
return ChromeThreadPoolTask::EXECUTION_MODE_JOB;
}
}
ChromeThreadPoolTask::ShutdownBehavior ShutdownBehaviorToProto(
TaskShutdownBehavior shutdown_behavior) {
switch (shutdown_behavior) {
case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_CONTINUE_ON_SHUTDOWN;
case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_SKIP_ON_SHUTDOWN;
case TaskShutdownBehavior::BLOCK_SHUTDOWN:
return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_BLOCK_SHUTDOWN;
}
}
// If this is greater than 0 on a given thread, it will ignore the DCHECK which
// prevents posting BLOCK_SHUTDOWN tasks after shutdown. There are cases where
// posting back to a BLOCK_SHUTDOWN sequence is a coincidence rather than part
// of a shutdown blocking series of tasks, this prevents racy DCHECKs in those
// cases.
constinit thread_local int fizzle_block_shutdown_tasks_ref = 0;
} // namespace
// Atomic internal state used by TaskTracker to track items that are blocking
// Shutdown. An "item" consist of either:
// - A running SKIP_ON_SHUTDOWN task
// - A queued/running BLOCK_SHUTDOWN TaskSource.
// Sequential consistency shouldn't be assumed from these calls (i.e. a thread
// reading |HasShutdownStarted() == true| isn't guaranteed to see all writes
// made before |StartShutdown()| on the thread that invoked it).
class TaskTracker::State {
public:
State() = default;
State(const State&) = delete;
State& operator=(const State&) = delete;
// Sets a flag indicating that shutdown has started. Returns true if there are
// items blocking shutdown. Can only be called once.
bool StartShutdown() {
const uint32_t old_value =
bits_.fetch_or(kShutdownHasStartedMask, std::memory_order_relaxed);
DCHECK((old_value & kShutdownHasStartedMask) == 0);
const auto num_items_blocking_shutdown =
old_value >> kNumItemsBlockingShutdownBitOffset;
return num_items_blocking_shutdown != 0;
}
// Returns true if shutdown has started.
bool HasShutdownStarted() const {
return bits_.load(std::memory_order_relaxed) & kShutdownHasStartedMask;
}
// Returns true if there are items blocking shutdown.
bool AreItemsBlockingShutdown() const {
const auto num_items_blocking_shutdown =
bits_.load(std::memory_order_relaxed) >>
kNumItemsBlockingShutdownBitOffset;
return num_items_blocking_shutdown != 0;
}
// Increments the number of items blocking shutdown. Returns true if
// shutdown has started.
bool IncrementNumItemsBlockingShutdown() {
#if DCHECK_IS_ON()
// Verify that no overflow will occur.
const auto num_items_blocking_shutdown =
bits_.load(std::memory_order_relaxed) >>
kNumItemsBlockingShutdownBitOffset;
DCHECK_LT(num_items_blocking_shutdown,
std::numeric_limits<uint32_t>::max() -
kNumItemsBlockingShutdownIncrement);
#endif
const uint32_t old_bits = bits_.fetch_add(
kNumItemsBlockingShutdownIncrement, std::memory_order_relaxed);
return old_bits & kShutdownHasStartedMask;
}
// Decrements the number of items blocking shutdown. Returns true if shutdown
// has started and the number of tasks blocking shutdown becomes zero.
bool DecrementNumItemsBlockingShutdown() {
const uint32_t old_bits = bits_.fetch_sub(
kNumItemsBlockingShutdownIncrement, std::memory_order_relaxed);
const bool shutdown_has_started = old_bits & kShutdownHasStartedMask;
const auto old_num_items_blocking_shutdown =
old_bits >> kNumItemsBlockingShutdownBitOffset;
DCHECK_GT(old_num_items_blocking_shutdown, 0u);
return shutdown_has_started && old_num_items_blocking_shutdown == 1;
}
private:
static constexpr uint32_t kShutdownHasStartedMask = 1;
static constexpr uint32_t kNumItemsBlockingShutdownBitOffset = 1;
static constexpr uint32_t kNumItemsBlockingShutdownIncrement =
1 << kNumItemsBlockingShutdownBitOffset;
// The LSB indicates whether shutdown has started. The other bits count the
// number of items blocking shutdown.
// No barriers are required to read/write |bits_| as this class is only used
// as an atomic state checker, it doesn't provide sequential consistency
// guarantees w.r.t. external state. Sequencing of the TaskTracker::State
// operations themselves is guaranteed by the fetch_add() RMW (read-
// modify-write) semantics however. For example, if two threads are racing to
// call IncrementNumItemsBlockingShutdown() and StartShutdown() respectively,
// either the first thread will win and the StartShutdown() call will see the
// blocking task or the second thread will win and
// IncrementNumItemsBlockingShutdown() will know that shutdown has started.
std::atomic<uint32_t> bits_ = 0;
};
TaskTracker::TaskTracker()
: has_log_best_effort_tasks_switch_(HasLogBestEffortTasksSwitch()),
state_(new State),
can_run_policy_(CanRunPolicy::kAll),
flush_cv_(flush_lock_.CreateConditionVariable()),
shutdown_lock_(&flush_lock_),
tracked_ref_factory_(this) {
// |flush_cv_| is only waited upon in FlushForTesting(), avoid instantiating a
// ScopedBlockingCallWithBaseSyncPrimitives from test threads intentionally
// idling themselves to wait on the ThreadPool.
flush_cv_.declare_only_used_while_idle();
}
TaskTracker::~TaskTracker() = default;
void TaskTracker::StartShutdown() {
CheckedAutoLock auto_lock(shutdown_lock_);
// This method can only be called once.
DCHECK(!shutdown_event_);
DCHECK(!state_->HasShutdownStarted());
shutdown_event_.emplace();
const bool tasks_are_blocking_shutdown = state_->StartShutdown();
// From now, if a thread causes the number of tasks blocking shutdown to
// become zero, it will call OnBlockingShutdownTasksComplete().
if (!tasks_are_blocking_shutdown) {
// If another thread posts a BLOCK_SHUTDOWN task at this moment, it will
// block until this method releases |shutdown_lock_|. Then, it will fail
// DCHECK(!shutdown_event_->IsSignaled()). This is the desired behavior
// because posting a BLOCK_SHUTDOWN task after StartShutdown() when no
// tasks are blocking shutdown isn't allowed.
shutdown_event_->Signal();
return;
}
}
void TaskTracker::CompleteShutdown() {
// It is safe to access |shutdown_event_| without holding |lock_| because the
// pointer never changes after being set by StartShutdown(), which must
// happen-before this.
DCHECK(TS_UNCHECKED_READ(shutdown_event_));
{
base::ScopedAllowBaseSyncPrimitives allow_wait;
// Allow tests to wait for and introduce logging about the shutdown tasks
// before we block this thread.
BeginCompleteShutdown(*TS_UNCHECKED_READ(shutdown_event_));
// Now block the thread until all tasks are done.
TS_UNCHECKED_READ(shutdown_event_)->Wait();
}
// Unblock FlushForTesting() and perform the FlushAsyncForTesting callback
// when shutdown completes.
{
CheckedAutoLock auto_lock(flush_lock_);
flush_cv_.Broadcast();
}
InvokeFlushCallbacksForTesting();
}
void TaskTracker::FlushForTesting() {
AssertFlushForTestingAllowed();
CheckedAutoLock auto_lock(flush_lock_);
while (num_incomplete_task_sources_.load(std::memory_order_acquire) != 0 &&
!IsShutdownComplete()) {
flush_cv_.Wait();
}
}
void TaskTracker::FlushAsyncForTesting(OnceClosure flush_callback) {
DCHECK(flush_callback);
{
CheckedAutoLock auto_lock(flush_lock_);
flush_callbacks_for_testing_.push_back(std::move(flush_callback));
}
if (num_incomplete_task_sources_.load(std::memory_order_acquire) == 0 ||
IsShutdownComplete()) {
InvokeFlushCallbacksForTesting();
}
}
void TaskTracker::SetCanRunPolicy(CanRunPolicy can_run_policy) {
can_run_policy_.store(can_run_policy);
}
void TaskTracker::WillEnqueueJob(JobTaskSource* task_source) {
task_source->WillEnqueue(sequence_nums_.GetNext(), task_annotator_);
}
bool TaskTracker::WillPostTask(Task* task,
TaskShutdownBehavior shutdown_behavior) {
DCHECK(task);
DCHECK(task->task);
task->sequence_num = sequence_nums_.GetNext();
if (state_->HasShutdownStarted()) {
// A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
// started and the task is not delayed.
if (shutdown_behavior != TaskShutdownBehavior::BLOCK_SHUTDOWN ||
!task->delayed_run_time.is_null() ||
fizzle_block_shutdown_tasks_ref > 0) {
return false;
}
// A BLOCK_SHUTDOWN task posted after shutdown has completed is an ordering
// bug. This aims to catch those early. In some cases it's a racy
// coincidence (i.e. posting back to a BLOCK_SHUTDOWN sequence from a task
// that wasn't itself guaranteed to finish before shutdown), in those cases
// a ScopedFizzleBlockShutdownTasks can bump
// `fizzle_block_shutdown_tasks_ref` to bypass this DCHECK.
CheckedAutoLock auto_lock(shutdown_lock_);
DCHECK(shutdown_event_);
DCHECK(!shutdown_event_->IsSignaled())
<< "posted_from: " << task->posted_from.ToString();
}
// TODO(scheduler-dev): Record the task traits here.
task_annotator_.WillQueueTask("ThreadPool_PostTask", task);
return true;
}
bool TaskTracker::WillPostTaskNow(const Task& task,
TaskPriority priority) const {
// Delayed tasks's TaskShutdownBehavior is implicitly capped at
// SKIP_ON_SHUTDOWN. i.e. it cannot BLOCK_SHUTDOWN, TaskTracker will not wait
// for a delayed task in a BLOCK_SHUTDOWN TaskSource and will also skip
// delayed tasks that happen to become ripe during shutdown.
if (!task.delayed_run_time.is_null() && state_->HasShutdownStarted()) {
return false;
}
if (has_log_best_effort_tasks_switch_ &&
priority == TaskPriority::BEST_EFFORT) {
// A TaskPriority::BEST_EFFORT task is being posted.
LOG(INFO) << task.posted_from.ToString();
}
return true;
}
RegisteredTaskSource TaskTracker::RegisterTaskSource(
scoped_refptr<TaskSource> task_source) {
DCHECK(task_source);
TaskShutdownBehavior shutdown_behavior = task_source->shutdown_behavior();
if (!BeforeQueueTaskSource(shutdown_behavior)) {
return nullptr;
}
num_incomplete_task_sources_.fetch_add(1, std::memory_order_relaxed);
return RegisteredTaskSource(std::move(task_source), this);
}
bool TaskTracker::CanRunPriority(TaskPriority priority) const {
auto can_run_policy = can_run_policy_.load();
if (can_run_policy == CanRunPolicy::kAll) {
return true;
}
if (can_run_policy == CanRunPolicy::kForegroundOnly &&
priority >= TaskPriority::USER_VISIBLE) {
return true;
}
return false;
}
RegisteredTaskSource TaskTracker::RunAndPopNextTask(
RegisteredTaskSource task_source) {
DCHECK(task_source);
const bool should_run_tasks = BeforeRunTask(task_source->shutdown_behavior());
// Run the next task in |task_source|.
std::optional<Task> task;
TaskTraits traits;
{
auto transaction = task_source->BeginTransaction();
task = should_run_tasks ? task_source.TakeTask(&transaction)
: task_source.Clear(&transaction);
traits = transaction.traits();
}
if (task) {
// Skip delayed tasks if shutdown started.
if (!task->delayed_run_time.is_null() && state_->HasShutdownStarted()) {
task->task = base::DoNothingWithBoundArgs(std::move(task->task));
}
// Run the |task| (whether it's a worker task or the Clear() closure).
RunTask(std::move(task.value()), task_source.get(), traits);
}
if (should_run_tasks) {
AfterRunTask(task_source->shutdown_behavior());
}
const bool task_source_must_be_queued = task_source.DidProcessTask();
// |task_source| should be reenqueued iff requested by DidProcessTask().
if (task_source_must_be_queued) {
return task_source;
}
return nullptr;
}
bool TaskTracker::HasShutdownStarted() const {
return state_->HasShutdownStarted();
}
bool TaskTracker::IsShutdownComplete() const {
CheckedAutoLock auto_lock(shutdown_lock_);
return shutdown_event_ && shutdown_event_->IsSignaled();
}
void TaskTracker::BeginFizzlingBlockShutdownTasks() {
++fizzle_block_shutdown_tasks_ref;
}
void TaskTracker::EndFizzlingBlockShutdownTasks() {
CHECK_GE(--fizzle_block_shutdown_tasks_ref, 0);
}
void TaskTracker::RunTask(Task task,
TaskSource* task_source,
const TaskTraits& traits) {
DCHECK(task_source);
const auto environment = task_source->GetExecutionEnvironment();
struct BlockShutdownTaskFizzler {
BlockShutdownTaskFizzler() {
// Nothing outside RunTask should be bumping
// `fizzle_block_shutdown_tasks_ref`.
DCHECK_EQ(fizzle_block_shutdown_tasks_ref, 0);
++fizzle_block_shutdown_tasks_ref;
}
~BlockShutdownTaskFizzler() {
--fizzle_block_shutdown_tasks_ref;
// The refs should be balanced after running the task.
DCHECK_EQ(fizzle_block_shutdown_tasks_ref, 0);
}
};
std::optional<ScopedDisallowSingleton> disallow_singleton;
std::optional<ScopedDisallowBlocking> disallow_blocking;
std::optional<ScopedDisallowBaseSyncPrimitives> disallow_sync_primitives;
std::optional<BlockShutdownTaskFizzler> fizzle_block_shutdown_tasks;
if (traits.shutdown_behavior() ==
TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
disallow_singleton.emplace();
fizzle_block_shutdown_tasks.emplace();
}
if (!traits.may_block()) {
disallow_blocking.emplace();
}
if (!traits.with_base_sync_primitives()) {
disallow_sync_primitives.emplace();
}
{
DCHECK(environment.token.IsValid());
TaskScope task_scope(environment.token,
/* is_thread_bound=*/task_source->execution_mode() ==
TaskSourceExecutionMode::kSingleThread);
ScopedSetTaskPriorityForCurrentThread
scoped_set_task_priority_for_current_thread(traits.priority());
// Local storage map used if none is provided by |environment|.
std::optional<SequenceLocalStorageMap> local_storage_map;
if (!environment.sequence_local_storage) {
local_storage_map.emplace();
}
ScopedSetSequenceLocalStorageMapForCurrentThread
scoped_set_sequence_local_storage_map_for_current_thread(
environment.sequence_local_storage
? environment.sequence_local_storage
: &local_storage_map.value());
// Set up TaskRunner CurrentDefaultHandle as expected for the scope of the
// task.
std::optional<SequencedTaskRunner::CurrentDefaultHandle>
sequenced_task_runner_current_default_handle;
std::optional<SingleThreadTaskRunner::CurrentDefaultHandle>
single_thread_task_runner_current_default_handle;
if (environment.sequenced_task_runner) {
DCHECK_EQ(TaskSourceExecutionMode::kSequenced,
task_source->execution_mode());
sequenced_task_runner_current_default_handle.emplace(
environment.sequenced_task_runner);
} else if (environment.single_thread_task_runner) {
DCHECK_EQ(TaskSourceExecutionMode::kSingleThread,
task_source->execution_mode());
single_thread_task_runner_current_default_handle.emplace(
environment.single_thread_task_runner);
} else {
DCHECK_NE(TaskSourceExecutionMode::kSequenced,
task_source->execution_mode());
DCHECK_NE(TaskSourceExecutionMode::kSingleThread,
task_source->execution_mode());
}
RunTaskWithShutdownBehavior(task, traits, task_source, environment.token);
// Make sure the arguments bound to the callback are deleted within the
// scope in which the callback runs.
task.task = OnceClosure();
}
}
void TaskTracker::BeginCompleteShutdown(base::WaitableEvent& shutdown_event) {
// Do nothing in production, tests may override this.
}
bool TaskTracker::HasIncompleteTaskSourcesForTesting() const {
return num_incomplete_task_sources_.load(std::memory_order_acquire) != 0;
}
bool TaskTracker::BeforeQueueTaskSource(
TaskShutdownBehavior shutdown_behavior) {
if (shutdown_behavior == TaskShutdownBehavior::BLOCK_SHUTDOWN) {
// BLOCK_SHUTDOWN task sources block shutdown between the moment they are
// queued and the moment their last task completes its execution.
const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
if (shutdown_started) {
// A BLOCK_SHUTDOWN task posted after shutdown has completed is an
// ordering bug. This aims to catch those early.
CheckedAutoLock auto_lock(shutdown_lock_);
DCHECK(shutdown_event_);
DCHECK(!shutdown_event_->IsSignaled());
}
return true;
}
// A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
// started.
return !state_->HasShutdownStarted();
}
bool TaskTracker::BeforeRunTask(TaskShutdownBehavior shutdown_behavior) {
switch (shutdown_behavior) {
case TaskShutdownBehavior::BLOCK_SHUTDOWN: {
// The number of tasks blocking shutdown has been incremented when the
// task was posted.
DCHECK(state_->AreItemsBlockingShutdown());
// Trying to run a BLOCK_SHUTDOWN task after shutdown has completed is
// unexpected as it either shouldn't have been posted if shutdown
// completed or should be blocking shutdown if it was posted before it
// did.
DCHECK(!state_->HasShutdownStarted() || !IsShutdownComplete());
return true;
}
case TaskShutdownBehavior::SKIP_ON_SHUTDOWN: {
// SKIP_ON_SHUTDOWN tasks block shutdown while they are running.
const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
if (shutdown_started) {
// The SKIP_ON_SHUTDOWN task isn't allowed to run during shutdown.
// Decrement the number of tasks blocking shutdown that was wrongly
// incremented.
DecrementNumItemsBlockingShutdown();
return false;
}
return true;
}
case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN: {
return !state_->HasShutdownStarted();
}
}
NOTREACHED();
}
void TaskTracker::AfterRunTask(TaskShutdownBehavior shutdown_behavior) {
if (shutdown_behavior == TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
DecrementNumItemsBlockingShutdown();
}
}
scoped_refptr<TaskSource> TaskTracker::UnregisterTaskSource(
scoped_refptr<TaskSource> task_source) {
DCHECK(task_source);
if (task_source->shutdown_behavior() ==
TaskShutdownBehavior::BLOCK_SHUTDOWN) {
DecrementNumItemsBlockingShutdown();
}
DecrementNumIncompleteTaskSources();
return task_source;
}
void TaskTracker::DecrementNumItemsBlockingShutdown() {
const bool shutdown_started_and_no_items_block_shutdown =
state_->DecrementNumItemsBlockingShutdown();
if (!shutdown_started_and_no_items_block_shutdown) {
return;
}
CheckedAutoLock auto_lock(shutdown_lock_);
DCHECK(shutdown_event_);
shutdown_event_->Signal();
}
void TaskTracker::DecrementNumIncompleteTaskSources() {
const auto prev_num_incomplete_task_sources =
num_incomplete_task_sources_.fetch_sub(1);
DCHECK_GE(prev_num_incomplete_task_sources, 1);
if (prev_num_incomplete_task_sources == 1) {
{
CheckedAutoLock auto_lock(flush_lock_);
flush_cv_.Broadcast();
}
InvokeFlushCallbacksForTesting();
}
}
void TaskTracker::InvokeFlushCallbacksForTesting() {
base::circular_deque<OnceClosure> flush_callbacks;
{
CheckedAutoLock auto_lock(flush_lock_);
flush_callbacks = std::move(flush_callbacks_for_testing_);
}
for (auto& flush_callback : flush_callbacks) {
std::move(flush_callback).Run();
}
}
void TaskTracker::EmitThreadPoolTraceEventMetadata(perfetto::EventContext& ctx,
const TaskTraits& traits,
TaskSource* task_source,
const SequenceToken& token) {
if (TRACE_EVENT_CATEGORY_ENABLED("scheduler.flow")) {
if (token.IsValid()) {
ctx.event()->add_flow_ids(reinterpret_cast<uint64_t>(this) ^
static_cast<uint64_t>(token.ToInternalValue()));
}
}
// Other parameters are included only when "scheduler" category is enabled.
if (TRACE_EVENT_CATEGORY_ENABLED("scheduler")) {
auto* task = ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
->set_thread_pool_task();
task->set_task_priority(TaskPriorityToProto(traits.priority()));
task->set_execution_mode(
ExecutionModeToProto(task_source->execution_mode()));
task->set_shutdown_behavior(
ShutdownBehaviorToProto(traits.shutdown_behavior()));
if (token.IsValid()) {
task->set_sequence_token(token.ToInternalValue());
}
}
}
NOINLINE void TaskTracker::RunContinueOnShutdown(Task& task,
const TaskTraits& traits,
TaskSource* task_source,
const SequenceToken& token) {
NO_CODE_FOLDING();
RunTaskImpl(task, traits, task_source, token);
}
NOINLINE void TaskTracker::RunSkipOnShutdown(Task& task,
const TaskTraits& traits,
TaskSource* task_source,
const SequenceToken& token) {
NO_CODE_FOLDING();
RunTaskImpl(task, traits, task_source, token);
}
NOINLINE void TaskTracker::RunBlockShutdown(Task& task,
const TaskTraits& traits,
TaskSource* task_source,
const SequenceToken& token) {
NO_CODE_FOLDING();
RunTaskImpl(task, traits, task_source, token);
}
void TaskTracker::RunTaskImpl(Task& task,
const TaskTraits& traits,
TaskSource* task_source,
const SequenceToken& token) {
task_annotator_.RunTask(
"ThreadPool_RunTask", task, [&](perfetto::EventContext& ctx) {
EmitThreadPoolTraceEventMetadata(ctx, traits, task_source, token);
});
}
void TaskTracker::RunTaskWithShutdownBehavior(Task& task,
const TaskTraits& traits,
TaskSource* task_source,
const SequenceToken& token) {
switch (traits.shutdown_behavior()) {
case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
RunContinueOnShutdown(task, traits, task_source, token);
return;
case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
RunSkipOnShutdown(task, traits, task_source, token);
return;
case TaskShutdownBehavior::BLOCK_SHUTDOWN:
RunBlockShutdown(task, traits, task_source, token);
return;
}
}
} // namespace base::internal
|