1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
|
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_THREAD_POOL_SEQUENCE_H_
#define BASE_TASK_THREAD_POOL_SEQUENCE_H_
#include <stddef.h>
#include "base/base_export.h"
#include "base/compiler_specific.h"
#include "base/containers/intrusive_heap.h"
#include "base/containers/queue.h"
#include "base/sequence_token.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/pooled_parallel_task_runner.h"
#include "base/task/thread_pool/task.h"
#include "base/task/thread_pool/task_source.h"
#include "base/task/thread_pool/task_source_sort_key.h"
#include "base/thread_annotations.h"
#include "base/threading/sequence_local_storage_map.h"
namespace base {
namespace internal {
// A Sequence is intended to hold delayed tasks and immediate tasks.
// Delayed tasks are held in a prority_queue until they are ripe and
// immediate tasks in a simple fifo queue.
// When Sequence::TakeTask is called, we select the next appropriate task
// from both queues and return it.
// Each queue holds slots each containing up to a single Task that must be
// executed in posting/runtime order.
//
// In comments below, an "empty Sequence" is a Sequence with no slot.
//
// Note: there is a known refcounted-ownership cycle in the Scheduler
// architecture: Sequence -> Task -> TaskRunner -> Sequence -> ...
// This is okay so long as the other owners of Sequence (PriorityQueue and
// WorkerThread in alternation and
// ThreadGroup::WorkerThreadDelegateImpl::GetWork()
// temporarily) keep running it (and taking Tasks from it as a result). A
// dangling reference cycle would only occur should they release their reference
// to it while it's not empty. In other words, it is only correct for them to
// release it after PopTask() returns false to indicate it was made empty by
// that call (in which case the next PushImmediateTask() will return true to
// indicate to the caller that the Sequence should be re-enqueued for
// execution). This class is thread-safe.
class BASE_EXPORT Sequence : public TaskSource {
public:
// A Transaction can perform multiple operations atomically on a
// Sequence. While a Transaction is alive, it is guaranteed that nothing
// else will access the Sequence; the Sequence's lock is held for the
// lifetime of the Transaction.
class BASE_EXPORT Transaction : public TaskSource::Transaction {
public:
Transaction(Transaction&& other);
Transaction(const Transaction&) = delete;
Transaction& operator=(const Transaction&) = delete;
~Transaction();
// Returns true if the sequence must be added to the immediate queue after
// receiving a new immediate Task in order to be scheduled. If the caller
// doesn't want the sequence to be scheduled, it may not add the sequence to
// the immediate queue even if this returns true.
bool WillPushImmediateTask();
// Adds immediate |task| to the end of this sequence.
void PushImmediateTask(Task task);
// Adds a delayed |task| in this sequence, and returns true if the sequence
// needs to be re-enqueued in the delayed queue as a result of this
// sequence's delayed sort key changing.
bool PushDelayedTask(Task task);
Sequence* sequence() const { return static_cast<Sequence*>(task_source()); }
private:
friend class Sequence;
explicit Transaction(Sequence* sequence);
};
// |traits| is metadata that applies to all Tasks in the Sequence.
// |task_runner| is a reference to the TaskRunner feeding this TaskSource.
// |task_runner| can be nullptr only for tasks with no TaskRunner, in which
// case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the
// execution mode of |task_runner|.
Sequence(const TaskTraits& traits,
SequencedTaskRunner* task_runner,
TaskSourceExecutionMode execution_mode);
Sequence(const Sequence&) = delete;
Sequence& operator=(const Sequence&) = delete;
// Begins a Transaction. This method cannot be called on a thread which has an
// active Sequence::Transaction.
[[nodiscard]] Transaction BeginTransaction();
// TaskSource:
ExecutionEnvironment GetExecutionEnvironment() override;
size_t GetRemainingConcurrency() const override;
TaskSourceSortKey GetSortKey() const override;
TimeTicks GetDelayedSortKey() const override;
// Returns a token that uniquely identifies this Sequence.
const SequenceToken& token() const LIFETIME_BOUND { return token_; }
SequenceLocalStorageMap* sequence_local_storage() {
return &sequence_local_storage_;
}
bool OnBecomeReady() override;
bool has_worker_for_testing() const NO_THREAD_SAFETY_ANALYSIS {
return has_worker_;
}
bool is_immediate_for_testing() const { return is_immediate_; }
bool IsEmptyForTesting() const NO_THREAD_SAFETY_ANALYSIS { return IsEmpty(); }
// A reference to TaskRunner is only retained between
// PushImmediateTask()/PushDelayedTask() and when DidProcessTask() returns
// false, guaranteeing it is safe to dereference this pointer. Otherwise, the
// caller should guarantee such TaskRunner still exists before dereferencing.
SequencedTaskRunner* task_runner() const { return task_runner_; }
private:
~Sequence() override;
struct DelayedTaskGreater {
bool operator()(const Task& lhs, const Task& rhs) const;
};
// TaskSource:
RunStatus WillRunTask() override;
Task TakeTask(TaskSource::Transaction* transaction) override;
std::optional<Task> Clear(TaskSource::Transaction* transaction) override;
bool DidProcessTask(TaskSource::Transaction* transaction) override;
bool WillReEnqueue(TimeTicks now,
TaskSource::Transaction* transaction) override;
// Returns true if the delayed task to be posted will cause the delayed sort
// key to change.
bool DelayedSortKeyWillChange(const Task& delayed_task) const
EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Selects the earliest task to run, either from immediate or
// delayed queue and return it.
// Expects this sequence to have at least one task that can run
// immediately.
Task TakeEarliestTask() EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Get and return next task from immediate queue
Task TakeNextImmediateTask() EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Update the next earliest/latest ready time.
void UpdateReadyTimes() EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Returns true if there are immediate tasks
bool HasImmediateTasks() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Returns true if tasks ready to be executed
bool HasReadyTasks(TimeTicks now) const override;
bool IsEmpty() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Releases reference to TaskRunner.
void ReleaseTaskRunner();
const SequenceToken token_ = SequenceToken::Create();
// A pointer to the TaskRunner that posts to this TaskSource, if any. The
// derived class is responsible for calling AddRef() when a TaskSource from
// which no Task is executing becomes non-empty and Release() when
// it becomes empty again (e.g. when DidProcessTask() returns false).
//
// In practise, this pointer is going to become dangling. See task_runner()
// comment.
raw_ptr<SequencedTaskRunner, DisableDanglingPtrDetection> task_runner_;
// Queues of tasks to execute.
base::queue<Task> queue_ GUARDED_BY(lock_);
base::IntrusiveHeap<Task, DelayedTaskGreater> delayed_queue_
GUARDED_BY(lock_);
// Caches the latest/earliest ready time for atomic access. Writes are
// protected by |lock_|, but allows atomic reads outside of |lock_|. If this
// sequence is empty, these are in an unknown state and shouldn't be read.
// Minimum of latest_delayed_run_time() of next delayed task if any, and
// |queue_time| of next immediate task if any.
std::atomic<TimeTicks> latest_ready_time_ GUARDED_BY(lock_){TimeTicks()};
// is_null() if there is an immediate task, or earliest_delayed_run_time() of
// next delayed task otherwise.
std::atomic<TimeTicks> earliest_ready_time_ GUARDED_BY(lock_){TimeTicks()};
// True if a worker is currently associated with a Task from this Sequence.
bool has_worker_ = false;
// True if the sequence has ready tasks and requested to be queued as such
// through WillPushImmediateTask() or OnBecomeReady(). Reset to false once all
// ready tasks are done being processed and either DidProcessTask() or
// WillReEnqueue() returned false. Normally, |is_immediate_| is protected by
// |lock_|, except in OnBecomeReady() hence the use of atomics.
std::atomic_bool is_immediate_{false};
// Holds data stored through the SequenceLocalStorageSlot API.
SequenceLocalStorageMap sequence_local_storage_;
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_THREAD_POOL_SEQUENCE_H_
|