File: task_queue_factory.cc

package info (click to toggle)
chromium 139.0.7258.127-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 6,122,068 kB
  • sloc: cpp: 35,100,771; ansic: 7,163,530; javascript: 4,103,002; python: 1,436,920; asm: 946,517; xml: 746,709; pascal: 187,653; perl: 88,691; sh: 88,436; objc: 79,953; sql: 51,488; cs: 44,583; fortran: 24,137; makefile: 22,147; tcl: 15,277; php: 13,980; yacc: 8,984; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (207 lines) | stat: -rw-r--r-- 7,941 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "third_party/webrtc_overrides/task_queue_factory.h"

#include <memory>
#include <string_view>
#include <utility>

#include "base/functional/bind.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_refptr.h"
#include "base/synchronization/lock.h"
#include "base/task/delay_policy.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool.h"
#include "base/thread_annotations.h"
#include "base/time/time.h"
#include "third_party/abseil-cpp/absl/functional/any_invocable.h"
#include "third_party/webrtc/api/task_queue/task_queue_base.h"
#include "third_party/webrtc/api/task_queue/task_queue_factory.h"
#include "third_party/webrtc/api/units/time_delta.h"
#include "third_party/webrtc_overrides/api/location.h"
#include "third_party/webrtc_overrides/coalesced_tasks.h"
#include "third_party/webrtc_overrides/timer_based_tick_provider.h"

namespace blink {

class WebRtcTaskQueue : public base::RefCountedThreadSafe<WebRtcTaskQueue>,
                        public webrtc::TaskQueueBase {
 public:
  explicit WebRtcTaskQueue(base::TaskTraits traits);

  // webrtc::TaskQueueBase implementation.
  void Delete() override;
  void PostTaskImpl(absl::AnyInvocable<void() &&> task,
                    const PostTaskTraits& traits,
                    const webrtc::Location& location) override;
  void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
                           webrtc::TimeDelta delay,
                           const PostDelayedTaskTraits& traits,
                           const webrtc::Location& location) override;

 private:
  friend class base::RefCountedThreadSafe<WebRtcTaskQueue>;
  ~WebRtcTaskQueue() override = default;

  // Runs a single PostTask-task.
  void RunTask(absl::AnyInvocable<void() &&> task);
  // Runs all ready PostDelayedTask-tasks that have been scheduled to run at
  // |scheduled_time_now|.
  void MaybeRunCoalescedTasks(base::TimeTicks scheduled_time_now);
  // Runs a specific high precision task.
  void RunHighPrecisionTask(int id);

  const scoped_refptr<base::SequencedTaskRunner> task_runner_;

  // Kept during task execution to guarantee Delete semantics. Only contended
  // in case both Delete and a task runs concurrently. All tasks run and get
  // destroyed serially.
  base::Lock alive_lock_;
  // Turns to false in Delete.
  bool alive_ GUARDED_BY(alive_lock_) = true;

  // Low precision tasks are coalesced onto metronome ticks and stored in
  // |coalesced_tasks_| until they are ready to run.
  CoalescedTasks coalesced_tasks_;
};

WebRtcTaskQueue::WebRtcTaskQueue(base::TaskTraits traits)
    : task_runner_(
          base::ThreadPool::CreateSequencedTaskRunner(std::move(traits))) {
  // This reference is eventually released by Delete being called.
  AddRef();
}

void WebRtcTaskQueue::Delete() {
  {
    // Ensure no more tasks are going to be run.
    base::AutoLock lock(alive_lock_);
    alive_ = false;

    // Pretend to be the current task queue and clear the other tasks. This
    // works because we're always deleting or running tasks under the
    // `alive_lock_`, which we keep here.
    CurrentTaskQueueSetter setter(this);
    coalesced_tasks_.Clear();
  }
  // Drop the first reference we took when creating the task queue. We are
  // deleted when all closures posted to the task runner has run, or right here
  // in Release().
  Release();
}

void WebRtcTaskQueue::RunTask(absl::AnyInvocable<void() &&> task) {
  CurrentTaskQueueSetter set_current(this);
  base::AutoLock lock(alive_lock_);
  if (alive_)
    std::move(task)();
  // Ensure task is destroyed before `set_current` goes out of scope.
  task = nullptr;
}

void WebRtcTaskQueue::PostTaskImpl(absl::AnyInvocable<void() &&> task,
                                   const PostTaskTraits& traits,
                                   const webrtc::Location& location) {
  task_runner_->PostTask(
      location, base::BindOnce(&WebRtcTaskQueue::RunTask,
                               base::RetainedRef(this), std::move(task)));
}

void WebRtcTaskQueue::MaybeRunCoalescedTasks(
    base::TimeTicks scheduled_time_now) {
  base::AutoLock lock(alive_lock_);
  if (alive_) {
    CurrentTaskQueueSetter set_current(this);
    coalesced_tasks_.RunScheduledTasks(scheduled_time_now);
  }
}

void WebRtcTaskQueue::PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
                                          webrtc::TimeDelta delay,
                                          const PostDelayedTaskTraits& traits,
                                          const webrtc::Location& location) {
  const base::TimeTicks target_time =
      base::TimeTicks::Now() + base::Microseconds(delay.us());
  const base::TimeTicks snapped_target_time =
      TimerBasedTickProvider::TimeSnappedToNextTick(
          target_time, TimerBasedTickProvider::kDefaultPeriod);
  if (!traits.high_precision &&
      coalesced_tasks_.QueueDelayedTask(target_time, std::move(task),
                                        snapped_target_time)) {
    task_runner_->PostDelayedTaskAt(
        base::subtle::PostDelayedTaskPassKey(), location,
        base::BindOnce(&WebRtcTaskQueue::MaybeRunCoalescedTasks,
                       base::RetainedRef(this), snapped_target_time),
        snapped_target_time, base::subtle::DelayPolicy::kPrecise);
  } else if (traits.high_precision) {
    task_runner_->PostDelayedTaskAt(
        base::subtle::PostDelayedTaskPassKey(), location,
        base::BindOnce(&WebRtcTaskQueue::RunTask, base::RetainedRef(this),
                       std::move(task)),
        target_time, base::subtle::DelayPolicy::kPrecise);
  }
}

namespace {

base::TaskTraits TaskQueuePriority2Traits(
    webrtc::TaskQueueFactory::Priority priority) {
  // The content/renderer/media/webrtc/rtc_video_encoder.* code
  // employs a PostTask/Wait pattern that uses TQ in a way that makes it
  // blocking and synchronous, which is why we allow WithBaseSyncPrimitives()
  // for OS_ANDROID.
  // The libvpx threading adapters also need to wait for an event.
  switch (priority) {
    case webrtc::TaskQueueFactory::Priority::HIGH:
#if defined(OS_ANDROID)
      return {base::MayBlock(), base::WithBaseSyncPrimitives(),
              base::TaskPriority::HIGHEST};
#else
      return {base::MayBlock(), base::TaskPriority::HIGHEST};
#endif
    case webrtc::TaskQueueFactory::Priority::LOW:
      return {base::MayBlock(), base::TaskPriority::BEST_EFFORT};
    case webrtc::TaskQueueFactory::Priority::NORMAL:
    default:
#if defined(OS_ANDROID)
      return {base::MayBlock(), base::WithBaseSyncPrimitives()};
#else
      // On Windows, software encoders need to map HW frames which requires
      // blocking calls.
      // The libvpx threading adapters also need to wait for an event.
      return {base::MayBlock()};
#endif
  }
}

std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
CreateTaskQueueHelper(webrtc::TaskQueueFactory::Priority priority) {
  return std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>(
      new WebRtcTaskQueue(TaskQueuePriority2Traits(priority)));
}

class WebrtcTaskQueueFactory final : public webrtc::TaskQueueFactory {
 public:
  std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
  CreateTaskQueue(std::string_view name, Priority priority) const override {
    return CreateTaskQueueHelper(priority);
  }
};

}  // namespace

}  // namespace blink

std::unique_ptr<webrtc::TaskQueueFactory> CreateWebRtcTaskQueueFactory() {
  return std::unique_ptr<webrtc::TaskQueueFactory>(
      new blink::WebrtcTaskQueueFactory());
}

std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
CreateWebRtcTaskQueue(webrtc::TaskQueueFactory::Priority priority) {
  return blink::CreateTaskQueueHelper(priority);
}