File: thread_wrapper.h

package info (click to toggle)
chromium 138.0.7204.157-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 6,071,864 kB
  • sloc: cpp: 34,936,859; ansic: 7,176,967; javascript: 4,110,704; python: 1,419,953; asm: 946,768; xml: 739,967; pascal: 187,324; sh: 89,623; perl: 88,663; objc: 79,944; sql: 50,304; cs: 41,786; fortran: 24,137; makefile: 21,806; php: 13,980; tcl: 13,166; yacc: 8,925; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (168 lines) | stat: -rw-r--r-- 6,657 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
// Copyright 2012 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef COMPONENTS_WEBRTC_THREAD_WRAPPER_H_
#define COMPONENTS_WEBRTC_THREAD_WRAPPER_H_

#include <stdint.h>

#include <list>
#include <map>
#include <memory>
#include <optional>

#include "base/auto_reset.h"
#include "base/compiler_specific.h"
#include "base/feature_list.h"
#include "base/functional/callback_forward.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/current_thread.h"
#include "base/task/single_thread_task_runner.h"
#include "base/time/time.h"
#include "third_party/webrtc/rtc_base/thread.h"
#include "third_party/webrtc_overrides/api/location.h"
#include "third_party/webrtc_overrides/coalesced_tasks.h"

namespace webrtc {

// ThreadWrapper implements webrtc::Thread interface on top of
// Chromium's SingleThreadTaskRunner interface. Currently only the bare minimum
// that is used by P2P part of libjingle is implemented. There are two ways to
// create this object:
//
// - Call EnsureForCurrentMessageLoop(). This approach works only on threads
//   that have MessageLoop In this case ThreadWrapper deletes itself
//   automatically when MessageLoop is destroyed.
// - Using ThreadWrapper() constructor. In this case the creating code
//   must pass a valid task runner for the current thread and also delete the
//   wrapper later.
class ThreadWrapper : public base::CurrentThread::DestructionObserver,
                      public webrtc::Thread {
 public:
  // A repeating callback whose TimeDelta argument indicates a duration sample.
  // What the duration represents is contextual.
  using SampledDurationCallback =
      base::RepeatingCallback<void(base::TimeDelta)>;

  // Create ThreadWrapper for the current thread if it hasn't been created
  // yet. The thread wrapper is destroyed automatically when the current
  // MessageLoop is destroyed.
  static void EnsureForCurrentMessageLoop();

  // Creates ThreadWrapper for |task_runner| that runs tasks on the
  // current thread.
  static std::unique_ptr<ThreadWrapper> WrapTaskRunner(
      ::scoped_refptr<base::SingleThreadTaskRunner> task_runner);

  // Returns thread wrapper for the current thread or nullptr if it doesn't
  // exist.
  static ThreadWrapper* current();

  // Sets task latency & duration sample callbacks intended to gather UMA
  // statistics. Samples are acquired periodically every several seconds by
  // ThreadWrapper. In this context,
  // * task latency is defined as the duration between the moment a task is
  //   scheduled from ThreadWrapper's task runner, and the moment
  //   it begins running.
  // * task duration is defined as the duration between the moment the
  //   ThreadWrapper begins running a task and the moment it ends
  //   executing it. It only measures durations of tasks posted to
  //   webrtc::Thread.
  // The passed callbacks are called in the ThreadWrapper's task runner
  // context.
  void SetLatencyAndTaskDurationCallbacks(
      SampledDurationCallback task_latency_callback,
      SampledDurationCallback task_duration_callback);

  ThreadWrapper(const ThreadWrapper&) = delete;
  ThreadWrapper& operator=(const ThreadWrapper&) = delete;

  ~ThreadWrapper() override;

  // Sets whether the thread can be used to send messages
  // synchronously to another thread using BlockingCall() method. Set to false
  // by default to avoid potential jankiness when BlockingCall() used on
  // renderer thread. It should be set explicitly for threads that
  // need to call BlockingCall() for other threads.
  void set_send_allowed(bool allowed) { send_allowed_ = allowed; }

  webrtc::SocketServer* SocketServer();

  // CurrentThread::DestructionObserver implementation.
  void WillDestroyCurrentMessageLoop() override;

  // Following methods are not supported. They are overriden just to
  // ensure that they are not called (each of them contain NOTREACHED
  // in the body). Some of this methods can be implemented if it
  // becomes necessary to use webrtc code that calls them.
  bool IsQuitting() override;
  void Quit() override;
  void Restart() override;
  int GetDelay() override;

  // webrtc::Thread overrides.
  void Stop() override;
  void Run() override;

 private:
  struct PendingSend;
  class PostTaskLatencySampler;

  explicit ThreadWrapper(
      ::scoped_refptr<base::SingleThreadTaskRunner> task_runner);

  // webrtc::Thread overrides.
  void BlockingCallImpl(webrtc::FunctionView<void()> functor,
                        const webrtc::Location& location) override;
  // TaskQueueBase overrides.
  void PostTaskImpl(absl::AnyInvocable<void() &&> task,
                    const PostTaskTraits& traits,
                    const Location& location) override;
  void PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task,
                           TimeDelta delay,
                           const PostDelayedTaskTraits& traits,
                           const Location& location) override;

  void ProcessPendingSends();

  // Executes WebRTC queued tasks from TaskQueueBase overrides on
  // |task_runner_|.
  void RunTaskQueueTask(absl::AnyInvocable<void() &&> task);
  void RunCoalescedTaskQueueTasks(base::TimeTicks scheduled_time);

  // Called before a task runs, returns an opaque optional timestamp which
  // should be passed into FinalizeRunTask.
  std::optional<base::TimeTicks> PrepareRunTask();
  // Called after a task has run. Move the return value of PrepareRunTask as
  // |task_start_timestamp|.
  void FinalizeRunTask(std::optional<base::TimeTicks> task_start_timestamp);

  const base::AutoReset<ThreadWrapper*> resetter_;

  // Task runner used to execute messages posted on this thread.
  ::scoped_refptr<base::SingleThreadTaskRunner> task_runner_;

  bool send_allowed_;

  // |lock_| must be locked when accessing |pending_send_messages_|.
  base::Lock lock_;
  std::list<raw_ptr<PendingSend, CtnExperimental>> pending_send_messages_;
  base::WaitableEvent pending_send_event_;
  std::unique_ptr<PostTaskLatencySampler> latency_sampler_;
  SampledDurationCallback task_latency_callback_;
  SampledDurationCallback task_duration_callback_;
  // Low precision tasks are coalesced onto metronome ticks and stored in
  // `coalesced_tasks_` until they are ready to run.
  blink::CoalescedTasks coalesced_tasks_;

  base::WeakPtr<ThreadWrapper> weak_ptr_;
  base::WeakPtrFactory<ThreadWrapper> weak_ptr_factory_{this};
};

}  // namespace webrtc

#endif  // COMPONENTS_WEBRTC_THREAD_WRAPPER_H_