1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
/*
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
*/
#pragma once
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <mutex>
#include <queue>
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
/// Unbounded thread-safe work queue.
//
// This file is an excerpt from Facebook's zstd repo at
// https://github.com/facebook/zstd/. The relevant file is
// contrib/pzstd/utils/WorkQueue.h.
template <typename T>
class WorkQueue {
// Protects all member variable access
std::mutex mutex_;
std::condition_variable readerCv_;
std::condition_variable writerCv_;
std::condition_variable finishCv_;
std::queue<T> queue_;
bool done_;
std::size_t maxSize_;
// Must have lock to call this function
bool full() const {
if (maxSize_ == 0) {
return false;
}
return queue_.size() >= maxSize_;
}
public:
/**
* Constructs an empty work queue with an optional max size.
* If `maxSize == 0` the queue size is unbounded.
*
* @param maxSize The maximum allowed size of the work queue.
*/
WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
/**
* Push an item onto the work queue. Notify a single thread that work is
* available. If `finish()` has been called, do nothing and return false.
* If `push()` returns false, then `item` has not been copied from.
*
* @param item Item to push onto the queue.
* @returns True upon success, false if `finish()` has been called. An
* item was pushed iff `push()` returns true.
*/
template <typename U>
bool push(U&& item) {
{
std::unique_lock<std::mutex> lock(mutex_);
while (full() && !done_) {
writerCv_.wait(lock);
}
if (done_) {
return false;
}
queue_.push(std::forward<U>(item));
}
readerCv_.notify_one();
return true;
}
/**
* Attempts to pop an item off the work queue. It will block until data is
* available or `finish()` has been called.
*
* @param[out] item If `pop` returns `true`, it contains the popped item.
* If `pop` returns `false`, it is unmodified.
* @returns True upon success. False if the queue is empty and
* `finish()` has been called.
*/
bool pop(T& item) {
{
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty() && !done_) {
readerCv_.wait(lock);
}
if (queue_.empty()) {
assert(done_);
return false;
}
item = queue_.front();
queue_.pop();
}
writerCv_.notify_one();
return true;
}
/**
* Sets the maximum queue size. If `maxSize == 0` then it is unbounded.
*
* @param maxSize The new maximum queue size.
*/
void setMaxSize(std::size_t maxSize) {
{
std::lock_guard<std::mutex> lock(mutex_);
maxSize_ = maxSize;
}
writerCv_.notify_all();
}
/**
* Promise that `push()` won't be called again, so once the queue is empty
* there will never any more work.
*/
void finish() {
{
std::lock_guard<std::mutex> lock(mutex_);
assert(!done_);
done_ = true;
}
readerCv_.notify_all();
writerCv_.notify_all();
finishCv_.notify_all();
}
/// Blocks until `finish()` has been called (but the queue may not be empty).
void waitUntilFinished() {
std::unique_lock<std::mutex> lock(mutex_);
while (!done_) {
finishCv_.wait(lock);
}
}
};
} // namespace ROCKSDB_NAMESPACE
|