1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef PLATFORM_IMPL_TASK_RUNNER_H_
#define PLATFORM_IMPL_TASK_RUNNER_H_
#include <condition_variable> // NOLINT
#include <map>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/types/optional.h"
#include "platform/api/task_runner.h"
#include "platform/api/time.h"
#include "platform/base/error.h"
#include "util/trace_logging.h"
namespace openscreen {
class TaskRunnerImpl final : public TaskRunner {
public:
using Task = TaskRunner::Task;
class TaskWaiter {
public:
virtual ~TaskWaiter() = default;
// These calls should be thread-safe. The absolute minimum is that
// OnTaskPosted must be safe to call from another thread while this is
// inside WaitForTaskToBePosted. NOTE: There may be spurious wakeups from
// WaitForTaskToBePosted depending on whether the specific implementation
// chooses to clear queued WakeUps before entering WaitForTaskToBePosted.
// Blocks until some event occurs, which means new tasks may have been
// posted. Wait may only block up to |timeout| where 0 means don't block at
// all (not block forever).
virtual Error WaitForTaskToBePosted(Clock::duration timeout) = 0;
// If a WaitForTaskToBePosted call is currently blocking, unblock it
// immediately.
virtual void OnTaskPosted() = 0;
};
explicit TaskRunnerImpl(
ClockNowFunctionPtr now_function,
TaskWaiter* event_waiter = nullptr,
Clock::duration waiter_timeout = std::chrono::milliseconds(100));
// TaskRunner overrides
~TaskRunnerImpl() final;
void PostPackagedTask(Task task) final;
void PostPackagedTaskWithDelay(Task task, Clock::duration delay) final;
bool IsRunningOnTaskRunner() final;
// Blocks the current thread, executing tasks from the queue with the desired
// timing; and does not return until some time after RequestStopSoon() is
// called.
void RunUntilStopped();
// Blocks the current thread, executing tasks from the queue with the desired
// timing; and does not return until some time after the current process is
// signaled with SIGINT or SIGTERM, or after RequestStopSoon() is called.
void RunUntilSignaled();
// Thread-safe method for requesting the TaskRunner to stop running after all
// non-delayed tasks in the queue have run. This behavior allows final
// clean-up tasks to be executed before the TaskRunner stops.
//
// If any non-delayed tasks post additional non-delayed tasks, those will be
// run as well before returning.
void RequestStopSoon();
private:
#if defined(ENABLE_TRACE_LOGGING)
// Wrapper around a Task used to store the TraceId Metadata along with the
// task itself, and to set the current TraceIdHierarchy before executing the
// task.
class TaskWithMetadata {
public:
// NOTE: 'explicit' keyword omitted so that conversion construtor can be
// used. This simplifies switching between 'Task' and 'TaskWithMetadata'
// based on the compilation flag.
TaskWithMetadata(Task task) // NOLINT
: task_(std::move(task)), trace_ids_(TRACE_HIERARCHY) {}
void operator()() {
TRACE_SET_HIERARCHY(trace_ids_);
std::move(task_)();
}
private:
Task task_;
TraceIdHierarchy trace_ids_;
};
#else // !defined(ENABLE_TRACE_LOGGING)
using TaskWithMetadata = Task;
#endif // defined(ENABLE_TRACE_LOGGING)
// Helper that runs all tasks in |running_tasks_| and then clears it.
void RunRunnableTasks();
// Look at all tasks in the delayed task queue, then schedule them if the
// minimum delay time has elapsed.
void ScheduleDelayedTasks();
// Transfers all ready-to-run tasks from |tasks_| to |running_tasks_|. If
// there are no ready-to-run tasks, and |is_running_| is true, this method
// will block waiting for new tasks. Returns true if any tasks were
// transferred.
bool GrabMoreRunnableTasks();
const ClockNowFunctionPtr now_function_;
// Flag that indicates whether the task runner loop should continue. This is
// only meant to be read/written on the thread executing RunUntilStopped().
bool is_running_;
// This mutex is used for |tasks_| and |delayed_tasks_|, and also for
// notifying the run loop to wake up when it is waiting for a task to be added
// to the queue in |run_loop_wakeup_|.
std::mutex task_mutex_;
std::vector<TaskWithMetadata> tasks_ GUARDED_BY(task_mutex_);
std::multimap<Clock::time_point, TaskWithMetadata> delayed_tasks_
GUARDED_BY(task_mutex_);
// When |task_waiter_| is nullptr, |run_loop_wakeup_| is used for sleeping the
// task runner. Otherwise, |run_loop_wakeup_| isn't used and |task_waiter_|
// is used instead (along with |waiter_timeout_|).
std::condition_variable run_loop_wakeup_;
TaskWaiter* const task_waiter_;
Clock::duration waiter_timeout_;
// To prevent excessive re-allocation of the underlying array of the |tasks_|
// vector, use an A/B vector-swap mechanism. |running_tasks_| starts out
// empty, and is swapped with |tasks_| when it is time to run the Tasks.
std::vector<TaskWithMetadata> running_tasks_;
std::thread::id task_runner_thread_id_;
OSP_DISALLOW_COPY_AND_ASSIGN(TaskRunnerImpl);
};
} // namespace openscreen
#endif // PLATFORM_IMPL_TASK_RUNNER_H_
|