File: SequentialTaskQueue.h

package info (click to toggle)
darkradiant 3.9.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 41,080 kB
  • sloc: cpp: 264,743; ansic: 10,659; python: 1,852; xml: 1,650; sh: 92; makefile: 21
file content (123 lines) | stat: -rw-r--r-- 3,258 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#pragma once

#include <mutex>
#include <list>
#include <functional>
#include <future>

namespace util
{

/**
 * Queueing helper, allowing to run queued tasks one after the other,
 * each of which will be run asynchronously (by means of std::async).
 * No task will be started before a previous one is completed.
 *
 * Destroying this object will remove all unstarted tasks from the queue,
 * but will block until the currently running task is done.
 */
class SequentialTaskQueue
{
private:
    mutable std::mutex _queueLock;
    std::list<std::function<void()>> _queue;

    mutable std::recursive_mutex _currentLock;
    std::future<void> _current;
    std::future<void> _finished;

public:
    ~SequentialTaskQueue()
    {
        clear();
    }

    // Adds the given task to the queue. This will launch the task
    // immediately if no other task is currently processed
    void enqueue(const std::function<void()>& task)
    {
        {
            std::lock_guard<std::mutex> lock(_queueLock);
            _queue.push_front(task);
        }

        if (isIdle())
        {
            startNextTask();
        }
    }

    // Removes all tasks that have not been processed yet
    // Doesn't affect the currently running one
    void clearPendingTasks()
    {
        // Lock the queue and remove any tasks such that no new ones are added
        std::lock_guard<std::mutex> lock(_queueLock);
        _queue.clear();
    }

    // Clears the queue. This might block waiting for any currently 
    // active task to finish
    void clear()
    {
        clearPendingTasks();

        _current = std::future<void>();
        _finished = std::future<void>();
    }

private:
    bool isIdle() const
    {
        std::lock_guard<std::recursive_mutex> lock(_currentLock);
        return !_current.valid() || _current.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
    }

    std::function<void()> dequeueOne()
    {
        std::lock_guard<std::mutex> lock(_queueLock);

        if (_queue.empty())
        {
            return std::function<void()>();
        }

        // No active task, dispatch a new one
        auto frontOfQueue = _queue.front();
        _queue.pop_front();

        return frontOfQueue;
    }

    void startNextTask()
    {
        auto task = dequeueOne();

        if (!task)
        {
            return;
        }

        // Wrap the given task in our own lambda to start the next task right afterwards
        std::lock_guard<std::recursive_mutex> lock(_currentLock);
        _current = std::async(std::launch::async, [this, task]()
        {
            task();

            {
                // Move our own task to the finished lane, 
                // to avoid blocking when assigning a new future
                std::lock_guard<std::recursive_mutex> lock(_currentLock);
                _finished = std::move(_current);

                // _current is now empty, so we can start a new task
                // We still hold the _currentLock such that no other thread will
                // check the _current future in isIdle in the meantime.
                // Since the mutex is recursive, this thread can assign to _current
                startNextTask();
            }
        });
    }
};

}