1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
|
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/single_thread_task_runner.h"
#include "base/thread_task_runner_handle.h"
#include "chrome/browser/chromeos/file_system_provider/queue.h"
namespace chromeos {
namespace file_system_provider {
Queue::Task::Task() : token(0), completed(false) {
}
Queue::Task::Task(size_t token, const AbortableCallback& callback)
: token(token), completed(false), callback(callback) {
}
Queue::Task::~Task() {
}
Queue::Queue(size_t max_in_parallel)
: max_in_parallel_(max_in_parallel),
next_token_(1),
weak_ptr_factory_(this) {
DCHECK_LT(0u, max_in_parallel);
}
Queue::~Queue() {
}
size_t Queue::NewToken() {
return next_token_++;
}
AbortCallback Queue::Enqueue(size_t token, const AbortableCallback& callback) {
#if !NDEBUG
const auto it = executed_.find(token);
DCHECK(it == executed_.end());
for (auto& task : pending_) {
DCHECK(token != task.token);
}
#endif
pending_.push_back(Task(token, callback));
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::Bind(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
return base::Bind(&Queue::Abort, weak_ptr_factory_.GetWeakPtr(), token);
}
void Queue::Complete(size_t token) {
const auto it = executed_.find(token);
DCHECK(it != executed_.end() && !it->second.completed);
it->second.completed = true;
}
void Queue::Remove(size_t token) {
const auto it = executed_.find(token);
DCHECK(it != executed_.end() && it->second.completed);
executed_.erase(it);
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::Bind(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
}
void Queue::MaybeRun() {
if (executed_.size() == max_in_parallel_ || !pending_.size()) {
return;
}
DCHECK_GT(max_in_parallel_, executed_.size());
Task task = pending_.front();
pending_.pop_front();
executed_[task.token] = task;
executed_[task.token].abort_callback = task.callback.Run();
}
void Queue::Abort(size_t token,
const storage::AsyncFileUtil::StatusCallback& callback) {
// Check if it's running.
const auto it = executed_.find(token);
if (it != executed_.end()) {
const Task& task = it->second;
// If the task is marked as completed, then it's impossible to abort it.
if (task.completed) {
callback.Run(base::File::FILE_ERROR_INVALID_OPERATION);
return;
}
DCHECK(!task.abort_callback.is_null());
it->second.abort_callback.Run(callback);
executed_.erase(it);
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE,
base::Bind(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
return;
}
// Aborting not running tasks is linear. TODO(mtomasz): Optimize if feasible.
for (auto it = pending_.begin(); it != pending_.end(); ++it) {
if (token == it->token) {
pending_.erase(it);
callback.Run(base::File::FILE_OK);
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE,
base::Bind(&Queue::MaybeRun, weak_ptr_factory_.GetWeakPtr()));
return;
}
}
// The task is already removed.
callback.Run(base::File::FILE_ERROR_INVALID_OPERATION);
}
} // namespace file_system_provider
} // namespace chromeos
|