1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
#define BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
#include <optional>
#include <string_view>
#include <vector>
#include "base/base_export.h"
#include "base/gtest_prod_util.h"
#include "base/profiler/thread_group_profiler.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/thread_pool/task_source.h"
#include "base/task/thread_pool/thread_group.h"
#include "base/task/thread_pool/tracked_ref.h"
#include "base/task/thread_pool/worker_thread.h"
#include "base/task/thread_pool/worker_thread_set.h"
#include "base/time/time.h"
namespace base {
class WorkerThreadObserver;
class ThreadGroupProfiler;
namespace internal {
class TaskTracker;
// A group of |WorkerThread|s that run |Task|s.
//
// The thread group doesn't create threads until Start() is called. Tasks can be
// posted at any time but will not run until after Start() is called.
//
// This class is thread-safe.
class BASE_EXPORT ThreadGroupImpl : public ThreadGroup {
public:
// Constructs a group without workers.
//
// |histogram_label| is used to label the thread group's histograms as
// "ThreadPool." + histogram_name + "." + |histogram_label| + extra suffixes.
// It must not be empty. |thread group_label| is used to label the thread
// group's threads, it must not be empty. |thread_type_hint| is the preferred
// thread type; the actual thread type depends on shutdown state and platform
// capabilities. |thread_group_type| is used for thread group profiler to tag
// the profiles collected on this group. |task_tracker| keeps track of tasks.
ThreadGroupImpl(std::string_view histogram_label,
std::string_view thread_group_label,
ThreadType thread_type_hint,
int64_t thread_group_type,
TrackedRef<TaskTracker> task_tracker,
TrackedRef<Delegate> delegate);
ThreadGroupImpl(const ThreadGroupImpl&) = delete;
ThreadGroupImpl& operator=(const ThreadGroupImpl&) = delete;
// Destroying a ThreadGroupImpl returned by Create() is not allowed
// in production; it is always leaked. In tests, it can only be destroyed
// after JoinForTesting() has returned.
~ThreadGroupImpl() override;
// ThreadGroup:
void Start(size_t max_tasks,
size_t max_best_effort_tasks,
TimeDelta suggested_reclaim_time,
scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
WorkerThreadObserver* worker_thread_observer,
WorkerEnvironment worker_environment,
bool synchronous_thread_start_for_testing,
std::optional<TimeDelta> may_block_threshold) override;
void Start(size_t max_tasks,
size_t max_best_effort_tasks,
TimeDelta suggested_reclaim_time,
scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
WorkerThreadObserver* worker_thread_observer,
WorkerEnvironment worker_environment,
bool synchronous_thread_start_for_testing = false) {
Start(max_tasks, max_best_effort_tasks, suggested_reclaim_time,
service_thread_task_runner, worker_thread_observer,
worker_environment, synchronous_thread_start_for_testing, {});
}
void JoinForTesting() override;
void DidUpdateCanRunPolicy() override;
void OnShutdownStarted() override;
// Returns the number of workers that are idle (i.e. not running tasks).
size_t NumberOfIdleWorkersLockRequiredForTesting() const
EXCLUSIVE_LOCKS_REQUIRED(lock_) override;
private:
class ScopedCommandsExecutor;
class WorkerDelegate;
friend class WorkerDelegate;
// friend tests so that they can access |blocked_workers_poll_period| and
// may_block_threshold(), both in ThreadGroup.
friend class ThreadGroupImplBlockingTest;
friend class ThreadGroupImplMayBlockTest;
FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest,
ThreadBlockUnblockPremature);
FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest,
ThreadBlockUnblockPrematureBestEffort);
// ThreadGroup:
void UpdateSortKey(TaskSource::Transaction transaction) override;
void PushTaskSourceAndWakeUpWorkers(
RegisteredTaskSourceAndTransaction transaction_with_task_source) override;
void EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor* executor)
override EXCLUSIVE_LOCKS_REQUIRED(lock_);
void ScheduleAdjustMaxTasks() override;
void AdjustMaxTasks() override;
// Creates a worker and schedules its start, if needed, to maintain one idle
// worker, |max_tasks_| permitting.
void MaintainAtLeastOneIdleWorkerLockRequired(
ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Creates a worker, adds it to the thread group, schedules its start and
// returns it. Cannot be called before Start().
scoped_refptr<WorkerThread> CreateAndRegisterWorkerLockRequired(
ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Returns the number of workers that are awake (i.e. not on the idle set).
size_t GetNumAwakeWorkersLockRequired() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
bool IsOnIdleSetLockRequired(WorkerThread* worker) const
EXCLUSIVE_LOCKS_REQUIRED(lock_);
size_t worker_sequence_num_ GUARDED_BY(lock_) = 0;
// Ordered set of idle workers; the order uses pointer comparison, this is
// arbitrary but stable. Initially, all workers are on this set. A worker is
// removed from the set before its WakeUp() function is called and when it
// receives work from GetWork() (a worker calls GetWork() when its sleep
// timeout expires, even if its WakeUp() method hasn't been called). A worker
// is inserted on this set when it receives nullptr from GetWork().
WorkerThreadSet idle_workers_set_ GUARDED_BY(lock_);
// This is used in ThreadGroupProfiler to tag as metadata on profiles
// collected for worker threads within this thread group.
const int64_t thread_group_type_;
// This is set in Start() if profiling is enabled, before any worker thread is
// created. If profiling is not enabled, this will remain std::nullopt. If
// created the ThreadGroupProfiler instance will exist until ThreadGroupImpl
// destruction.
std::optional<ThreadGroupProfiler> thread_group_profiler_;
// Ensures recently cleaned up workers (ref.
// WorkerDelegate::CleanupLockRequired()) had time to exit as
// they have a raw reference to |this| (and to TaskTracker) which can
// otherwise result in racy use-after-frees per no longer being part of
// |workers_| and hence not being explicitly joined in JoinForTesting():
// https://crbug.com/810464. Uses AtomicRefCount to make its only public
// method thread-safe.
TrackedRefFactory<ThreadGroupImpl> tracked_ref_factory_;
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
|