1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
#pragma once
#include <mutex>
#include <list>
#include <functional>
#include <future>
namespace util
{
/**
* Queueing helper, allowing to run queued tasks one after the other,
* each of which will be run asynchronously (by means of std::async).
* No task will be started before a previous one is completed.
*
* Destroying this object will remove all unstarted tasks from the queue,
* but will block until the currently running task is done.
*/
class SequentialTaskQueue
{
private:
mutable std::mutex _queueLock;
std::list<std::function<void()>> _queue;
mutable std::recursive_mutex _currentLock;
std::future<void> _current;
std::future<void> _finished;
public:
~SequentialTaskQueue()
{
clear();
}
// Adds the given task to the queue. This will launch the task
// immediately if no other task is currently processed
void enqueue(const std::function<void()>& task)
{
{
std::lock_guard<std::mutex> lock(_queueLock);
_queue.push_front(task);
}
if (isIdle())
{
startNextTask();
}
}
// Removes all tasks that have not been processed yet
// Doesn't affect the currently running one
void clearPendingTasks()
{
// Lock the queue and remove any tasks such that no new ones are added
std::lock_guard<std::mutex> lock(_queueLock);
_queue.clear();
}
// Clears the queue. This might block waiting for any currently
// active task to finish
void clear()
{
clearPendingTasks();
_current = std::future<void>();
_finished = std::future<void>();
}
private:
bool isIdle() const
{
std::lock_guard<std::recursive_mutex> lock(_currentLock);
return !_current.valid() || _current.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}
std::function<void()> dequeueOne()
{
std::lock_guard<std::mutex> lock(_queueLock);
if (_queue.empty())
{
return std::function<void()>();
}
// No active task, dispatch a new one
auto frontOfQueue = _queue.front();
_queue.pop_front();
return frontOfQueue;
}
void startNextTask()
{
auto task = dequeueOne();
if (!task)
{
return;
}
// Wrap the given task in our own lambda to start the next task right afterwards
std::lock_guard<std::recursive_mutex> lock(_currentLock);
_current = std::async(std::launch::async, [this, task]()
{
task();
{
// Move our own task to the finished lane,
// to avoid blocking when assigning a new future
std::lock_guard<std::recursive_mutex> lock(_currentLock);
_finished = std::move(_current);
// _current is now empty, so we can start a new task
// We still hold the _currentLock such that no other thread will
// check the _current future in isIdle in the meantime.
// Since the mutex is recursive, this thread can assign to _current
startNextTask();
}
});
}
};
}
|