1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
|
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_
#define BASE_THREADING_SEQUENCED_WORKER_POOL_H_
#include <cstddef>
#include <string>
#include "base/base_export.h"
#include "base/basictypes.h"
#include "base/callback_forward.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/task_runner.h"
namespace tracked_objects {
class Location;
} // namespace tracked_objects
namespace base {
class MessageLoopProxy;
template <class T> class DeleteHelper;
class SequencedTaskRunner;
// A worker thread pool that enforces ordering between sets of tasks. It also
// allows you to specify what should happen to your tasks on shutdown.
//
// To enforce ordering, get a unique sequence token from the pool and post all
// tasks you want to order with the token. All tasks with the same token are
// guaranteed to execute serially, though not necessarily on the same thread.
// This means that:
//
// - No two tasks with the same token will run at the same time.
//
// - Given two tasks T1 and T2 with the same token such that T2 will
// run after T1, then T2 will start after T1 is destroyed.
//
// - If T2 will run after T1, then all memory changes in T1 and T1's
// destruction will be visible to T2.
//
// Example:
// SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken();
// pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
// FROM_HERE, base::Bind(...));
// pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
// FROM_HERE, base::Bind(...));
//
// You can make named sequence tokens to make it easier to share a token
// across different components.
//
// You can also post tasks to the pool without ordering using PostWorkerTask.
// These will be executed in an unspecified order. The order of execution
// between tasks with different sequence tokens is also unspecified.
//
// This class may be leaked on shutdown to facilitate fast shutdown. The
// expected usage, however, is to call Shutdown(), which correctly accounts
// for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN
// behavior.
//
// Implementation note: This does not use a base::WorkerPool since that does
// not enforce shutdown semantics or allow us to specify how many worker
// threads to run. For the typical use case of random background work, we don't
// necessarily want to be super aggressive about creating threads.
//
// Note that SequencedWorkerPool is RefCountedThreadSafe (inherited
// from TaskRunner).
//
// Test-only code should wrap this in a base::SequencedWorkerPoolOwner to avoid
// memory leaks. See http://crbug.com/273800
class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
public:
// Defines what should happen to a task posted to the worker pool on
// shutdown.
enum WorkerShutdown {
// Tasks posted with this mode which have not run at shutdown will be
// deleted rather than run, and any tasks with this mode running at
// shutdown will be ignored (the worker thread will not be joined).
//
// This option provides a nice way to post stuff you don't want blocking
// shutdown. For example, you might be doing a slow DNS lookup and if it's
// blocked on the OS, you may not want to stop shutdown, since the result
// doesn't really matter at that point.
//
// However, you need to be very careful what you do in your callback when
// you use this option. Since the thread will continue to run until the OS
// terminates the process, the app can be in the process of tearing down
// when you're running. This means any singletons or global objects you
// use may suddenly become invalid out from under you. For this reason,
// it's best to use this only for slow but simple operations like the DNS
// example.
CONTINUE_ON_SHUTDOWN,
// Tasks posted with this mode that have not started executing at
// shutdown will be deleted rather than executed. However, any tasks that
// have already begun executing when shutdown is called will be allowed
// to continue, and will block shutdown until completion.
//
// Note: Because Shutdown() may block while these tasks are executing,
// care must be taken to ensure that they do not block on the thread that
// called Shutdown(), as this may lead to deadlock.
SKIP_ON_SHUTDOWN,
// Tasks posted with this mode will block shutdown until they're
// executed. Since this can have significant performance implications,
// use sparingly.
//
// Generally, this should be used only for user data, for example, a task
// writing a preference file.
//
// If a task is posted during shutdown, it will not get run since the
// workers may already be stopped. In this case, the post operation will
// fail (return false) and the task will be deleted.
BLOCK_SHUTDOWN,
};
// Opaque identifier that defines sequencing of tasks posted to the worker
// pool.
class SequenceToken {
public:
SequenceToken() : id_(0) {}
~SequenceToken() {}
bool Equals(const SequenceToken& other) const {
return id_ == other.id_;
}
// Returns false if current thread is executing an unsequenced task.
bool IsValid() const {
return id_ != 0;
}
private:
friend class SequencedWorkerPool;
explicit SequenceToken(int id) : id_(id) {}
int id_;
};
// Allows tests to perform certain actions.
class TestingObserver {
public:
virtual ~TestingObserver() {}
virtual void OnHasWork() = 0;
virtual void WillWaitForShutdown() = 0;
virtual void OnDestruct() = 0;
};
// Gets the SequencedToken of the current thread.
// If current thread is not a SequencedWorkerPool worker thread or is running
// an unsequenced task, returns an invalid SequenceToken.
static SequenceToken GetSequenceTokenForCurrentThread();
// When constructing a SequencedWorkerPool, there must be a
// MessageLoop on the current thread unless you plan to deliberately
// leak it.
// Pass the maximum number of threads (they will be lazily created as needed)
// and a prefix for the thread name to aid in debugging.
SequencedWorkerPool(size_t max_threads,
const std::string& thread_name_prefix);
// Like above, but with |observer| for testing. Does not take
// ownership of |observer|.
SequencedWorkerPool(size_t max_threads,
const std::string& thread_name_prefix,
TestingObserver* observer);
// Returns a unique token that can be used to sequence tasks posted to
// PostSequencedWorkerTask(). Valid tokens are always nonzero.
SequenceToken GetSequenceToken();
// Returns the sequence token associated with the given name. Calling this
// function multiple times with the same string will always produce the
// same sequence token. If the name has not been used before, a new token
// will be created.
SequenceToken GetNamedSequenceToken(const std::string& name);
// Returns a SequencedTaskRunner wrapper which posts to this
// SequencedWorkerPool using the given sequence token. Tasks with nonzero
// delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
// are posted with BLOCK_SHUTDOWN behavior.
scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
SequenceToken token);
// Returns a SequencedTaskRunner wrapper which posts to this
// SequencedWorkerPool using the given sequence token. Tasks with nonzero
// delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
// are posted with the given shutdown behavior.
scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior(
SequenceToken token,
WorkerShutdown shutdown_behavior);
// Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using
// the given shutdown behavior. Tasks with nonzero delay are posted with
// SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the
// given shutdown behavior.
scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior(
WorkerShutdown shutdown_behavior);
// Posts the given task for execution in the worker pool. Tasks posted with
// this function will execute in an unspecified order on a background thread.
// Returns true if the task was posted. If your tasks have ordering
// requirements, see PostSequencedWorkerTask().
//
// This class will attempt to delete tasks that aren't run
// (non-block-shutdown semantics) but can't guarantee that this happens. If
// all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there
// will be no workers available to delete these tasks. And there may be
// tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN
// tasks. Deleting those tasks before the previous one has completed could
// cause nondeterministic crashes because the task could be keeping some
// objects alive which do work in their destructor, which could voilate the
// assumptions of the running task.
//
// The task will be guaranteed to run to completion before shutdown
// (BLOCK_SHUTDOWN semantics).
//
// Returns true if the task was posted successfully. This may fail during
// shutdown regardless of the specified ShutdownBehavior.
bool PostWorkerTask(const tracked_objects::Location& from_here,
const Closure& task);
// Same as PostWorkerTask but allows a delay to be specified (although doing
// so changes the shutdown behavior). The task will be run after the given
// delay has elapsed.
//
// If the delay is nonzero, the task won't be guaranteed to run to completion
// before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
// If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the
// task will be guaranteed to run to completion before shutdown
// (BLOCK_SHUTDOWN semantics).
bool PostDelayedWorkerTask(const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay);
// Same as PostWorkerTask but allows specification of the shutdown behavior.
bool PostWorkerTaskWithShutdownBehavior(
const tracked_objects::Location& from_here,
const Closure& task,
WorkerShutdown shutdown_behavior);
// Like PostWorkerTask above, but provides sequencing semantics. This means
// that tasks posted with the same sequence token (see GetSequenceToken())
// are guaranteed to execute in order. This is useful in cases where you're
// doing operations that may depend on previous ones, like appending to a
// file.
//
// The task will be guaranteed to run to completion before shutdown
// (BLOCK_SHUTDOWN semantics).
//
// Returns true if the task was posted successfully. This may fail during
// shutdown regardless of the specified ShutdownBehavior.
bool PostSequencedWorkerTask(SequenceToken sequence_token,
const tracked_objects::Location& from_here,
const Closure& task);
// Like PostSequencedWorkerTask above, but allows you to specify a named
// token, which saves an extra call to GetNamedSequenceToken.
bool PostNamedSequencedWorkerTask(const std::string& token_name,
const tracked_objects::Location& from_here,
const Closure& task);
// Same as PostSequencedWorkerTask but allows a delay to be specified
// (although doing so changes the shutdown behavior). The task will be run
// after the given delay has elapsed.
//
// If the delay is nonzero, the task won't be guaranteed to run to completion
// before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
// If the delay is zero, this behaves exactly like PostSequencedWorkerTask,
// i.e. the task will be guaranteed to run to completion before shutdown
// (BLOCK_SHUTDOWN semantics).
bool PostDelayedSequencedWorkerTask(
SequenceToken sequence_token,
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay);
// Same as PostSequencedWorkerTask but allows specification of the shutdown
// behavior.
bool PostSequencedWorkerTaskWithShutdownBehavior(
SequenceToken sequence_token,
const tracked_objects::Location& from_here,
const Closure& task,
WorkerShutdown shutdown_behavior);
// TaskRunner implementation. Forwards to PostDelayedWorkerTask().
virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) OVERRIDE;
virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
// Returns true if the current thread is processing a task with the given
// sequence_token.
bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
// Blocks until all pending tasks are complete. This should only be called in
// unit tests when you want to validate something that should have happened.
// This will not flush delayed tasks; delayed tasks get deleted.
//
// Note that calling this will not prevent other threads from posting work to
// the queue while the calling thread is waiting on Flush(). In this case,
// Flush will return only when there's no more work in the queue. Normally,
// this doesn't come up since in a test, all the work is being posted from
// the main thread.
void FlushForTesting();
// Spuriously signal that there is work to be done.
void SignalHasWorkForTesting();
// Implements the worker pool shutdown. This should be called during app
// shutdown, and will discard/join with appropriate tasks before returning.
// After this call, subsequent calls to post tasks will fail.
//
// Must be called from the same thread this object was constructed on.
void Shutdown() { Shutdown(0); }
// A variant that allows an arbitrary number of new blocking tasks to
// be posted during shutdown from within tasks that execute during shutdown.
// Only tasks designated as BLOCKING_SHUTDOWN will be allowed, and only if
// posted by tasks that are not designated as CONTINUE_ON_SHUTDOWN. Once
// the limit is reached, subsequent calls to post task fail in all cases.
//
// Must be called from the same thread this object was constructed on.
void Shutdown(int max_new_blocking_tasks_after_shutdown);
// Check if Shutdown was called for given threading pool. This method is used
// for aborting time consuming operation to avoid blocking shutdown.
//
// Can be called from any thread.
bool IsShutdownInProgress();
protected:
virtual ~SequencedWorkerPool();
virtual void OnDestruct() const OVERRIDE;
private:
friend class RefCountedThreadSafe<SequencedWorkerPool>;
friend class DeleteHelper<SequencedWorkerPool>;
class Inner;
class Worker;
const scoped_refptr<MessageLoopProxy> constructor_message_loop_;
// Avoid pulling in too many headers by putting (almost) everything
// into |inner_|.
const scoped_ptr<Inner> inner_;
DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool);
};
} // namespace base
#endif // BASE_THREADING_SEQUENCED_WORKER_POOL_H_
|