1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
|
// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/webrtc_overrides/task_queue_factory.h"
#include <memory>
#include <string_view>
#include <utility>
#include "base/functional/bind.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_refptr.h"
#include "base/synchronization/lock.h"
#include "base/task/delay_policy.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool.h"
#include "base/thread_annotations.h"
#include "base/time/time.h"
#include "third_party/abseil-cpp/absl/functional/any_invocable.h"
#include "third_party/webrtc/api/task_queue/task_queue_base.h"
#include "third_party/webrtc/api/task_queue/task_queue_factory.h"
#include "third_party/webrtc/api/units/time_delta.h"
#include "third_party/webrtc_overrides/api/location.h"
#include "third_party/webrtc_overrides/coalesced_tasks.h"
#include "third_party/webrtc_overrides/timer_based_tick_provider.h"
namespace blink {
class WebRtcTaskQueue : public base::RefCountedThreadSafe<WebRtcTaskQueue>,
public webrtc::TaskQueueBase {
public:
explicit WebRtcTaskQueue(base::TaskTraits traits);
// webrtc::TaskQueueBase implementation.
void Delete() override;
void PostTaskImpl(absl::AnyInvocable<void() &&> task,
const PostTaskTraits& traits,
const webrtc::Location& location) override;
void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
webrtc::TimeDelta delay,
const PostDelayedTaskTraits& traits,
const webrtc::Location& location) override;
private:
friend class base::RefCountedThreadSafe<WebRtcTaskQueue>;
~WebRtcTaskQueue() override = default;
// Runs a single PostTask-task.
void RunTask(absl::AnyInvocable<void() &&> task);
// Runs all ready PostDelayedTask-tasks that have been scheduled to run at
// |scheduled_time_now|.
void MaybeRunCoalescedTasks(base::TimeTicks scheduled_time_now);
// Runs a specific high precision task.
void RunHighPrecisionTask(int id);
const scoped_refptr<base::SequencedTaskRunner> task_runner_;
// Kept during task execution to guarantee Delete semantics. Only contended
// in case both Delete and a task runs concurrently. All tasks run and get
// destroyed serially.
base::Lock alive_lock_;
// Turns to false in Delete.
bool alive_ GUARDED_BY(alive_lock_) = true;
// Low precision tasks are coalesced onto metronome ticks and stored in
// |coalesced_tasks_| until they are ready to run.
CoalescedTasks coalesced_tasks_;
};
WebRtcTaskQueue::WebRtcTaskQueue(base::TaskTraits traits)
: task_runner_(
base::ThreadPool::CreateSequencedTaskRunner(std::move(traits))) {
// This reference is eventually released by Delete being called.
AddRef();
}
void WebRtcTaskQueue::Delete() {
{
// Ensure no more tasks are going to be run.
base::AutoLock lock(alive_lock_);
alive_ = false;
// Pretend to be the current task queue and clear the other tasks. This
// works because we're always deleting or running tasks under the
// `alive_lock_`, which we keep here.
CurrentTaskQueueSetter setter(this);
coalesced_tasks_.Clear();
}
// Drop the first reference we took when creating the task queue. We are
// deleted when all closures posted to the task runner has run, or right here
// in Release().
Release();
}
void WebRtcTaskQueue::RunTask(absl::AnyInvocable<void() &&> task) {
CurrentTaskQueueSetter set_current(this);
base::AutoLock lock(alive_lock_);
if (alive_)
std::move(task)();
// Ensure task is destroyed before `set_current` goes out of scope.
task = nullptr;
}
void WebRtcTaskQueue::PostTaskImpl(absl::AnyInvocable<void() &&> task,
const PostTaskTraits& traits,
const webrtc::Location& location) {
task_runner_->PostTask(
location, base::BindOnce(&WebRtcTaskQueue::RunTask,
base::RetainedRef(this), std::move(task)));
}
void WebRtcTaskQueue::MaybeRunCoalescedTasks(
base::TimeTicks scheduled_time_now) {
base::AutoLock lock(alive_lock_);
if (alive_) {
CurrentTaskQueueSetter set_current(this);
coalesced_tasks_.RunScheduledTasks(scheduled_time_now);
}
}
void WebRtcTaskQueue::PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
webrtc::TimeDelta delay,
const PostDelayedTaskTraits& traits,
const webrtc::Location& location) {
const base::TimeTicks target_time =
base::TimeTicks::Now() + base::Microseconds(delay.us());
const base::TimeTicks snapped_target_time =
TimerBasedTickProvider::TimeSnappedToNextTick(
target_time, TimerBasedTickProvider::kDefaultPeriod);
if (!traits.high_precision &&
coalesced_tasks_.QueueDelayedTask(target_time, std::move(task),
snapped_target_time)) {
task_runner_->PostDelayedTaskAt(
base::subtle::PostDelayedTaskPassKey(), location,
base::BindOnce(&WebRtcTaskQueue::MaybeRunCoalescedTasks,
base::RetainedRef(this), snapped_target_time),
snapped_target_time, base::subtle::DelayPolicy::kPrecise);
} else if (traits.high_precision) {
task_runner_->PostDelayedTaskAt(
base::subtle::PostDelayedTaskPassKey(), location,
base::BindOnce(&WebRtcTaskQueue::RunTask, base::RetainedRef(this),
std::move(task)),
target_time, base::subtle::DelayPolicy::kPrecise);
}
}
namespace {
base::TaskTraits TaskQueuePriority2Traits(
webrtc::TaskQueueFactory::Priority priority) {
// The content/renderer/media/webrtc/rtc_video_encoder.* code
// employs a PostTask/Wait pattern that uses TQ in a way that makes it
// blocking and synchronous, which is why we allow WithBaseSyncPrimitives()
// for OS_ANDROID.
// The libvpx threading adapters also need to wait for an event.
switch (priority) {
case webrtc::TaskQueueFactory::Priority::HIGH:
#if defined(OS_ANDROID)
return {base::MayBlock(), base::WithBaseSyncPrimitives(),
base::TaskPriority::HIGHEST};
#else
return {base::MayBlock(), base::TaskPriority::HIGHEST};
#endif
case webrtc::TaskQueueFactory::Priority::LOW:
return {base::MayBlock(), base::TaskPriority::BEST_EFFORT};
case webrtc::TaskQueueFactory::Priority::NORMAL:
default:
#if defined(OS_ANDROID)
return {base::MayBlock(), base::WithBaseSyncPrimitives()};
#else
// On Windows, software encoders need to map HW frames which requires
// blocking calls.
// The libvpx threading adapters also need to wait for an event.
return {base::MayBlock()};
#endif
}
}
std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
CreateTaskQueueHelper(webrtc::TaskQueueFactory::Priority priority) {
return std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>(
new WebRtcTaskQueue(TaskQueuePriority2Traits(priority)));
}
class WebrtcTaskQueueFactory final : public webrtc::TaskQueueFactory {
public:
std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
CreateTaskQueue(std::string_view name, Priority priority) const override {
return CreateTaskQueueHelper(priority);
}
};
} // namespace
} // namespace blink
std::unique_ptr<webrtc::TaskQueueFactory> CreateWebRtcTaskQueueFactory() {
return std::unique_ptr<webrtc::TaskQueueFactory>(
new blink::WebrtcTaskQueueFactory());
}
std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
CreateWebRtcTaskQueue(webrtc::TaskQueueFactory::Priority priority) {
return blink::CreateTaskQueueHelper(priority);
}
|