1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
|
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_
#define COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_
#include <stddef.h>
#include <deque>
#include <memory>
#include <set>
#include <utility>
#include "base/bind.h"
#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/memory/weak_ptr.h"
#include "base/threading/non_thread_safe.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/time/time.h"
#include "base/timer/timer.h"
#include "net/base/backoff_entry.h"
namespace syncer {
// A queue that dispatches tasks, ignores duplicates, and provides backoff
// semantics.
//
// |T| is the task type.
//
// For each task added to the queue, the HandleTaskCallback will eventually be
// invoked. For each invocation, the user of TaskQueue must call exactly one of
// |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|.
//
// To retry a failed task, call MarkAsFailed(task) then AddToQueue(task).
//
// Example usage:
//
// void Handle(const Foo& foo);
// ...
// TaskQueue<Foo> queue(base::Bind(&Handle),
// base::TimeDelta::FromSeconds(1),
// base::TimeDelta::FromMinutes(1));
// ...
// {
// Foo foo;
// // Add foo to the queue. At some point, Handle will be invoked in this
// // message loop.
// queue.AddToQueue(foo);
// }
// ...
// void Handle(const Foo& foo) {
// DoSomethingWith(foo);
// // We must call one of the three methods to tell the queue how we're
// // dealing with foo. Of course, we are free to call in the the context of
// // this HandleTaskCallback or outside the context if we so choose.
// if (SuccessfullyHandled(foo)) {
// queue.MarkAsSucceeded(foo);
// } else if (Failed(foo)) {
// queue.MarkAsFailed(foo);
// if (ShouldRetry(foo)) {
// queue.AddToQueue(foo);
// }
// } else {
// Cancel(foo);
// }
// }
//
template <typename T>
class TaskQueue : base::NonThreadSafe {
public:
// A callback provided by users of the TaskQueue to handle tasks.
//
// This callback is invoked by the queue with a task to be handled. The
// callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|,
// or |Cancel| to signify completion of the task.
typedef base::Callback<void(const T&)> HandleTaskCallback;
// Construct a TaskQueue.
//
// |callback| the callback to be invoked for handling tasks.
//
// |initial_backoff_delay| the initial amount of time the queue will wait
// before dispatching tasks after a failed task (see |MarkAsFailed|). May be
// zero. Subsequent failures will increase the delay up to
// |max_backoff_delay|.
//
// |max_backoff_delay| the maximum amount of time the queue will wait before
// dispatching tasks. May be zero. Must be greater than or equal to
// |initial_backoff_delay|.
TaskQueue(const HandleTaskCallback& callback,
const base::TimeDelta& initial_backoff_delay,
const base::TimeDelta& max_backoff_delay);
// Add |task| to the end of the queue.
//
// If |task| is already present (as determined by operator==) it is not added.
void AddToQueue(const T& task);
// Mark |task| as completing successfully.
//
// Marking a task as completing successfully will reduce or eliminate any
// backoff delay in effect.
//
// May only be called after the HandleTaskCallback has been invoked with
// |task|.
void MarkAsSucceeded(const T& task);
// Mark |task| as failed.
//
// Marking a task as failed will cause a backoff, i.e. a delay in dispatching
// of subsequent tasks. Repeated failures will increase the delay.
//
// May only be called after the HandleTaskCallback has been invoked with
// |task|.
void MarkAsFailed(const T& task);
// Cancel |task|.
//
// |task| is removed from the queue and will not be retried. Does not affect
// the backoff delay.
//
// May only be called after the HandleTaskCallback has been invoked with
// |task|.
void Cancel(const T& task);
// Reset any backoff delay and resume dispatching of tasks.
//
// Useful for when you know the cause of previous failures has been resolved
// and you want don't want to wait for the accumulated backoff delay to
// elapse.
void ResetBackoff();
// Use |timer| for scheduled events.
//
// Used in tests. See also MockTimer.
void SetTimerForTest(std::unique_ptr<base::Timer> timer);
private:
void FinishTask(const T& task);
void ScheduleDispatch();
void Dispatch();
// Return true if we should dispatch tasks.
bool ShouldDispatch();
const HandleTaskCallback process_callback_;
net::BackoffEntry::Policy backoff_policy_;
std::unique_ptr<net::BackoffEntry> backoff_entry_;
// The number of tasks currently being handled.
int num_in_progress_;
std::deque<T> queue_;
// The set of tasks in queue_ or currently being handled.
std::set<T> tasks_;
base::Closure dispatch_closure_;
std::unique_ptr<base::Timer> backoff_timer_;
base::TimeDelta delay_;
// Must be last data member.
base::WeakPtrFactory<TaskQueue> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(TaskQueue);
};
// The maximum number of tasks that may be concurrently executed. Think
// carefully before changing this value. The desired behavior of backoff may
// not be obvious when there is more than one concurrent task
const int kMaxConcurrentTasks = 1;
template <typename T>
TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback,
const base::TimeDelta& initial_backoff_delay,
const base::TimeDelta& max_backoff_delay)
: process_callback_(callback),
backoff_policy_({}),
num_in_progress_(0),
weak_ptr_factory_(this) {
DCHECK_LE(initial_backoff_delay.InMicroseconds(),
max_backoff_delay.InMicroseconds());
backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds();
backoff_policy_.multiply_factor = 2.0;
backoff_policy_.jitter_factor = 0.1;
backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds();
backoff_policy_.entry_lifetime_ms = -1;
backoff_policy_.always_use_initial_delay = false;
backoff_entry_ = base::MakeUnique<net::BackoffEntry>(&backoff_policy_);
dispatch_closure_ =
base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr());
backoff_timer_ = base::MakeUnique<base::Timer>(false, false);
}
template <typename T>
void TaskQueue<T>::AddToQueue(const T& task) {
DCHECK(CalledOnValidThread());
// Ignore duplicates.
if (tasks_.find(task) == tasks_.end()) {
queue_.push_back(task);
tasks_.insert(task);
}
ScheduleDispatch();
}
template <typename T>
void TaskQueue<T>::MarkAsSucceeded(const T& task) {
DCHECK(CalledOnValidThread());
FinishTask(task);
// The task succeeded. Stop any pending timer, reset (clear) the backoff, and
// reschedule a dispatch.
backoff_timer_->Stop();
backoff_entry_->Reset();
ScheduleDispatch();
}
template <typename T>
void TaskQueue<T>::MarkAsFailed(const T& task) {
DCHECK(CalledOnValidThread());
FinishTask(task);
backoff_entry_->InformOfRequest(false);
ScheduleDispatch();
}
template <typename T>
void TaskQueue<T>::Cancel(const T& task) {
DCHECK(CalledOnValidThread());
FinishTask(task);
ScheduleDispatch();
}
template <typename T>
void TaskQueue<T>::ResetBackoff() {
backoff_timer_->Stop();
backoff_entry_->Reset();
ScheduleDispatch();
}
template <typename T>
void TaskQueue<T>::SetTimerForTest(std::unique_ptr<base::Timer> timer) {
DCHECK(CalledOnValidThread());
DCHECK(timer.get());
backoff_timer_ = std::move(timer);
}
template <typename T>
void TaskQueue<T>::FinishTask(const T& task) {
DCHECK(CalledOnValidThread());
DCHECK_GE(num_in_progress_, 1);
--num_in_progress_;
const size_t num_erased = tasks_.erase(task);
DCHECK_EQ(1U, num_erased);
}
template <typename T>
void TaskQueue<T>::ScheduleDispatch() {
DCHECK(CalledOnValidThread());
if (backoff_timer_->IsRunning() || !ShouldDispatch()) {
return;
}
backoff_timer_->Start(FROM_HERE, backoff_entry_->GetTimeUntilRelease(),
dispatch_closure_);
}
template <typename T>
void TaskQueue<T>::Dispatch() {
DCHECK(CalledOnValidThread());
if (!ShouldDispatch()) {
return;
}
DCHECK(!queue_.empty());
const T& task = queue_.front();
++num_in_progress_;
DCHECK_LE(num_in_progress_, kMaxConcurrentTasks);
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::Bind(process_callback_, task));
queue_.pop_front();
}
template <typename T>
bool TaskQueue<T>::ShouldDispatch() {
return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty();
}
} // namespace syncer
#endif // COMPONENTS_SYNC_MODEL_IMPL_ATTACHMENTS_TASK_QUEUE_H_
|