1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#pragma once
#include <functional>
#include <memory>
#include <queue>
#include <unordered_map>
#include <utility>
#include <vector>
#include "monitoring/instrumented_mutex.h"
#include "rocksdb/system_clock.h"
#include "test_util/sync_point.h"
#include "util/mutexlock.h"
namespace ROCKSDB_NAMESPACE {
// A Timer class to handle repeated work.
//
// `Start()` and `Shutdown()` are currently not thread-safe. The client must
// serialize calls to these two member functions.
//
// A single timer instance can handle multiple functions via a single thread.
// It is better to leave long running work to a dedicated thread pool.
//
// Timer can be started by calling `Start()`, and ended by calling `Shutdown()`.
// Work (in terms of a `void function`) can be scheduled by calling `Add` with
// a unique function name and de-scheduled by calling `Cancel`.
// Many functions can be added.
//
// Impl Details:
// A heap is used to keep track of when the next timer goes off.
// A map from a function name to the function keeps track of all the functions.
class Timer {
public:
explicit Timer(SystemClock* clock)
: clock_(clock),
mutex_(clock),
cond_var_(&mutex_),
running_(false),
executing_task_(false) {}
~Timer() { Shutdown(); }
// Add a new function to run.
// fn_name has to be identical, otherwise it will fail to add and return false
// start_after_us is the initial delay.
// repeat_every_us is the interval between ending time of the last call and
// starting time of the next call. For example, repeat_every_us = 2000 and
// the function takes 1000us to run. If it starts at time [now]us, then it
// finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us.
// repeat_every_us == 0 means do not repeat.
bool Add(std::function<void()> fn, const std::string& fn_name,
uint64_t start_after_us, uint64_t repeat_every_us) {
auto fn_info = std::make_unique<FunctionInfo>(std::move(fn), fn_name, 0,
repeat_every_us);
InstrumentedMutexLock l(&mutex_);
// Assign time within mutex to make sure the next_run_time is larger than
// the current running one
fn_info->next_run_time_us = clock_->NowMicros() + start_after_us;
// the new task start time should never before the current task executing
// time, as the executing task can only be running if it's next_run_time_us
// is due (<= clock_->NowMicros()).
if (executing_task_ &&
fn_info->next_run_time_us < heap_.top()->next_run_time_us) {
return false;
}
auto it = map_.find(fn_name);
if (it == map_.end()) {
heap_.push(fn_info.get());
map_.try_emplace(fn_name, std::move(fn_info));
} else {
// timer doesn't support duplicated function name
return false;
}
cond_var_.SignalAll();
return true;
}
void Cancel(const std::string& fn_name) {
InstrumentedMutexLock l(&mutex_);
// Mark the function with fn_name as invalid so that it will not be
// requeued.
auto it = map_.find(fn_name);
if (it != map_.end() && it->second) {
it->second->Cancel();
}
// If the currently running function is fn_name, then we need to wait
// until it finishes before returning to caller.
while (!heap_.empty() && executing_task_) {
FunctionInfo* func_info = heap_.top();
assert(func_info);
if (func_info->name == fn_name) {
WaitForTaskCompleteIfNecessary();
} else {
break;
}
}
}
void CancelAll() {
InstrumentedMutexLock l(&mutex_);
CancelAllWithLock();
}
// Start the Timer
bool Start() {
InstrumentedMutexLock l(&mutex_);
if (running_) {
return false;
}
running_ = true;
thread_ = std::make_unique<port::Thread>(&Timer::Run, this);
return true;
}
// Shutdown the Timer
bool Shutdown() {
{
InstrumentedMutexLock l(&mutex_);
if (!running_) {
return false;
}
running_ = false;
CancelAllWithLock();
cond_var_.SignalAll();
}
if (thread_) {
thread_->join();
}
return true;
}
bool HasPendingTask() const {
InstrumentedMutexLock l(&mutex_);
for (const auto& fn_info : map_) {
if (fn_info.second->IsValid()) {
return true;
}
}
return false;
}
#ifndef NDEBUG
// Wait until Timer starting waiting, call the optional callback, then wait
// for Timer waiting again.
// Tests can provide a custom Clock object to mock time, and use the callback
// here to bump current time and trigger Timer. See timer_test for example.
//
// Note: only support one caller of this method.
void TEST_WaitForRun(const std::function<void()>& callback = nullptr) {
InstrumentedMutexLock l(&mutex_);
// It act as a spin lock
while (executing_task_ ||
(!heap_.empty() &&
heap_.top()->next_run_time_us <= clock_->NowMicros())) {
cond_var_.TimedWait(clock_->NowMicros() + 1000);
}
if (callback != nullptr) {
callback();
}
cond_var_.SignalAll();
do {
cond_var_.TimedWait(clock_->NowMicros() + 1000);
} while (executing_task_ ||
(!heap_.empty() &&
heap_.top()->next_run_time_us <= clock_->NowMicros()));
}
size_t TEST_GetPendingTaskNum() const {
InstrumentedMutexLock l(&mutex_);
size_t ret = 0;
for (const auto& fn_info : map_) {
if (fn_info.second->IsValid()) {
ret++;
}
}
return ret;
}
void TEST_OverrideTimer(SystemClock* clock) {
InstrumentedMutexLock l(&mutex_);
clock_ = clock;
}
#endif // NDEBUG
private:
void Run() {
InstrumentedMutexLock l(&mutex_);
while (running_) {
if (heap_.empty()) {
// wait
TEST_SYNC_POINT("Timer::Run::Waiting");
cond_var_.Wait();
continue;
}
FunctionInfo* current_fn = heap_.top();
assert(current_fn);
if (!current_fn->IsValid()) {
heap_.pop();
map_.erase(current_fn->name);
continue;
}
if (current_fn->next_run_time_us <= clock_->NowMicros()) {
// make a copy of the function so it won't be changed after
// mutex_.unlock.
std::function<void()> fn = current_fn->fn;
executing_task_ = true;
mutex_.Unlock();
// Execute the work
fn();
mutex_.Lock();
executing_task_ = false;
cond_var_.SignalAll();
// Remove the work from the heap once it is done executing, make sure
// it's the same function after executing the work while mutex is
// released.
// Note that we are just removing the pointer from the heap. Its
// memory is still managed in the map (as it holds a unique ptr).
// So current_fn is still a valid ptr.
assert(heap_.top() == current_fn);
heap_.pop();
// current_fn may be cancelled already.
if (current_fn->IsValid() && current_fn->repeat_every_us > 0) {
assert(running_);
current_fn->next_run_time_us =
clock_->NowMicros() + current_fn->repeat_every_us;
// Schedule new work into the heap with new time.
heap_.push(current_fn);
} else {
// if current_fn is cancelled or no need to repeat, remove it from the
// map to avoid leak.
map_.erase(current_fn->name);
}
} else {
cond_var_.TimedWait(current_fn->next_run_time_us);
}
}
}
void CancelAllWithLock() {
mutex_.AssertHeld();
if (map_.empty() && heap_.empty()) {
return;
}
// With mutex_ held, set all tasks to invalid so that they will not be
// re-queued.
for (auto& elem : map_) {
auto& func_info = elem.second;
assert(func_info);
func_info->Cancel();
}
// WaitForTaskCompleteIfNecessary() may release mutex_
WaitForTaskCompleteIfNecessary();
while (!heap_.empty()) {
heap_.pop();
}
map_.clear();
}
// A wrapper around std::function to keep track when it should run next
// and at what frequency.
struct FunctionInfo {
// the actual work
std::function<void()> fn;
// name of the function
std::string name;
// when the function should run next
uint64_t next_run_time_us;
// repeat interval
uint64_t repeat_every_us;
// controls whether this function is valid.
// A function is valid upon construction and until someone explicitly
// calls `Cancel()`.
bool valid;
FunctionInfo(std::function<void()>&& _fn, std::string _name,
const uint64_t _next_run_time_us, uint64_t _repeat_every_us)
: fn(std::move(_fn)),
name(std::move(_name)),
next_run_time_us(_next_run_time_us),
repeat_every_us(_repeat_every_us),
valid(true) {}
void Cancel() { valid = false; }
bool IsValid() const { return valid; }
};
void WaitForTaskCompleteIfNecessary() {
mutex_.AssertHeld();
while (executing_task_) {
TEST_SYNC_POINT("Timer::WaitForTaskCompleteIfNecessary:TaskExecuting");
cond_var_.Wait();
}
}
struct RunTimeOrder {
bool operator()(const FunctionInfo* f1, const FunctionInfo* f2) {
return f1->next_run_time_us > f2->next_run_time_us;
}
};
SystemClock* clock_;
// This mutex controls both the heap_ and the map_. It needs to be held for
// making any changes in them.
mutable InstrumentedMutex mutex_;
InstrumentedCondVar cond_var_;
std::unique_ptr<port::Thread> thread_;
bool running_;
bool executing_task_;
std::priority_queue<FunctionInfo*, std::vector<FunctionInfo*>, RunTimeOrder>
heap_;
// In addition to providing a mapping from a function name to a function,
// it is also responsible for memory management.
std::unordered_map<std::string, std::unique_ptr<FunctionInfo>> map_;
};
} // namespace ROCKSDB_NAMESPACE
|