File: task_runner.h

package info (click to toggle)
android-platform-tools 34.0.5-12
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 150,900 kB
  • sloc: cpp: 805,786; java: 293,500; ansic: 128,288; xml: 127,491; python: 41,481; sh: 14,245; javascript: 9,665; cs: 3,846; asm: 2,049; makefile: 1,917; yacc: 440; awk: 368; ruby: 183; sql: 140; perl: 88; lex: 67
file content (149 lines) | stat: -rw-r--r-- 5,495 bytes parent folder | download | duplicates (10)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef PLATFORM_IMPL_TASK_RUNNER_H_
#define PLATFORM_IMPL_TASK_RUNNER_H_

#include <condition_variable>  // NOLINT
#include <map>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>

#include "absl/base/thread_annotations.h"
#include "absl/types/optional.h"
#include "platform/api/task_runner.h"
#include "platform/api/time.h"
#include "platform/base/error.h"
#include "util/trace_logging.h"

namespace openscreen {

class TaskRunnerImpl final : public TaskRunner {
 public:
  using Task = TaskRunner::Task;

  class TaskWaiter {
   public:
    virtual ~TaskWaiter() = default;

    // These calls should be thread-safe.  The absolute minimum is that
    // OnTaskPosted must be safe to call from another thread while this is
    // inside WaitForTaskToBePosted.  NOTE: There may be spurious wakeups from
    // WaitForTaskToBePosted depending on whether the specific implementation
    // chooses to clear queued WakeUps before entering WaitForTaskToBePosted.

    // Blocks until some event occurs, which means new tasks may have been
    // posted.  Wait may only block up to |timeout| where 0 means don't block at
    // all (not block forever).
    virtual Error WaitForTaskToBePosted(Clock::duration timeout) = 0;

    // If a WaitForTaskToBePosted call is currently blocking, unblock it
    // immediately.
    virtual void OnTaskPosted() = 0;
  };

  explicit TaskRunnerImpl(
      ClockNowFunctionPtr now_function,
      TaskWaiter* event_waiter = nullptr,
      Clock::duration waiter_timeout = std::chrono::milliseconds(100));

  // TaskRunner overrides
  ~TaskRunnerImpl() final;
  void PostPackagedTask(Task task) final;
  void PostPackagedTaskWithDelay(Task task, Clock::duration delay) final;
  bool IsRunningOnTaskRunner() final;

  // Blocks the current thread, executing tasks from the queue with the desired
  // timing; and does not return until some time after RequestStopSoon() is
  // called.
  void RunUntilStopped();

  // Blocks the current thread, executing tasks from the queue with the desired
  // timing; and does not return until some time after the current process is
  // signaled with SIGINT or SIGTERM, or after RequestStopSoon() is called.
  void RunUntilSignaled();

  // Thread-safe method for requesting the TaskRunner to stop running after all
  // non-delayed tasks in the queue have run. This behavior allows final
  // clean-up tasks to be executed before the TaskRunner stops.
  //
  // If any non-delayed tasks post additional non-delayed tasks, those will be
  // run as well before returning.
  void RequestStopSoon();

 private:
#if defined(ENABLE_TRACE_LOGGING)
  // Wrapper around a Task used to store the TraceId Metadata along with the
  // task itself, and to set the current TraceIdHierarchy before executing the
  // task.
  class TaskWithMetadata {
   public:
    // NOTE: 'explicit' keyword omitted so that conversion construtor can be
    // used. This simplifies switching between 'Task' and 'TaskWithMetadata'
    // based on the compilation flag.
    TaskWithMetadata(Task task)  // NOLINT
        : task_(std::move(task)), trace_ids_(TRACE_HIERARCHY) {}

    void operator()() {
      TRACE_SET_HIERARCHY(trace_ids_);
      std::move(task_)();
    }

   private:
    Task task_;
    TraceIdHierarchy trace_ids_;
  };
#else   // !defined(ENABLE_TRACE_LOGGING)
  using TaskWithMetadata = Task;
#endif  // defined(ENABLE_TRACE_LOGGING)

  // Helper that runs all tasks in |running_tasks_| and then clears it.
  void RunRunnableTasks();

  // Look at all tasks in the delayed task queue, then schedule them if the
  // minimum delay time has elapsed.
  void ScheduleDelayedTasks();

  // Transfers all ready-to-run tasks from |tasks_| to |running_tasks_|. If
  // there are no ready-to-run tasks, and |is_running_| is true, this method
  // will block waiting for new tasks. Returns true if any tasks were
  // transferred.
  bool GrabMoreRunnableTasks();

  const ClockNowFunctionPtr now_function_;

  // Flag that indicates whether the task runner loop should continue. This is
  // only meant to be read/written on the thread executing RunUntilStopped().
  bool is_running_;

  // This mutex is used for |tasks_| and |delayed_tasks_|, and also for
  // notifying the run loop to wake up when it is waiting for a task to be added
  // to the queue in |run_loop_wakeup_|.
  std::mutex task_mutex_;
  std::vector<TaskWithMetadata> tasks_ GUARDED_BY(task_mutex_);
  std::multimap<Clock::time_point, TaskWithMetadata> delayed_tasks_
      GUARDED_BY(task_mutex_);

  // When |task_waiter_| is nullptr, |run_loop_wakeup_| is used for sleeping the
  // task runner.  Otherwise, |run_loop_wakeup_| isn't used and |task_waiter_|
  // is used instead (along with |waiter_timeout_|).
  std::condition_variable run_loop_wakeup_;
  TaskWaiter* const task_waiter_;
  Clock::duration waiter_timeout_;

  // To prevent excessive re-allocation of the underlying array of the |tasks_|
  // vector, use an A/B vector-swap mechanism. |running_tasks_| starts out
  // empty, and is swapped with |tasks_| when it is time to run the Tasks.
  std::vector<TaskWithMetadata> running_tasks_;

  std::thread::id task_runner_thread_id_;

  OSP_DISALLOW_COPY_AND_ASSIGN(TaskRunnerImpl);
};
}  // namespace openscreen

#endif  // PLATFORM_IMPL_TASK_RUNNER_H_