1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
|
/* Copyright 2017-present Facebook, Inc.
* Licensed under the Apache License, Version 2.0 */
#include "ThreadPool.h"
#include "watchman_log.h"
namespace watchman {
ThreadPool& getThreadPool() {
static ThreadPool pool;
return pool;
}
ThreadPool::~ThreadPool() {
stop();
}
void ThreadPool::start(size_t numWorkers, size_t maxItems) {
std::unique_lock<std::mutex> lock(mutex_);
if (!workers_.empty()) {
throw std::runtime_error("ThreadPool already started");
}
if (stopping_) {
throw std::runtime_error("Cannot restart a stopped pool");
}
maxItems_ = maxItems;
for (auto i = 0U; i < numWorkers; ++i) {
workers_.emplace_back([this, i] {
w_set_thread_name("ThreadPool-%i", i);
runWorker();
});
}
}
void ThreadPool::runWorker() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return stopping_ || !tasks_.empty(); });
if (stopping_ && tasks_.empty()) {
return;
}
task = std::move(tasks_.front());
tasks_.pop_front();
}
task();
}
}
void ThreadPool::stop(bool join) {
{
std::unique_lock<std::mutex> lock(mutex_);
stopping_ = true;
}
condition_.notify_all();
if (join) {
for (auto& worker : workers_) {
worker.join();
}
}
}
void ThreadPool::run(std::function<void()>&& func) {
{
std::unique_lock<std::mutex> lock(mutex_);
if (stopping_) {
throw std::runtime_error("cannot add tasks after pool has stopped");
}
if (tasks_.size() + 1 >= maxItems_) {
throw std::runtime_error("thread pool queue is full");
}
tasks_.emplace_back(std::move(func));
}
condition_.notify_one();
}
}
|