File: TaskQueue.cpp

package info (click to toggle)
endless-sky 0.10.16-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 414,608 kB
  • sloc: cpp: 73,435; python: 893; xml: 666; sh: 271; makefile: 28
file content (181 lines) | stat: -rw-r--r-- 4,581 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/* TaskQueue.cpp
Copyright (c) 2022 by Michael Zahniser

Endless Sky is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later version.

Endless Sky is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program. If not, see <https://www.gnu.org/licenses/>.
*/

#include "TaskQueue.h"

#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <exception>

using namespace std;

namespace {

	// The main task queue used by the worker threads.
	queue<TaskQueue::Task> tasks;
	mutex asyncMutex;
	condition_variable asyncCondition;
	bool shouldQuit = false;

	// Worker threads for executing tasks.
	struct WorkerThreads {
		WorkerThreads() noexcept
		{
			threads.resize(max(4u, thread::hardware_concurrency()));
			for(thread &t : threads)
				t = thread(&TaskQueue::ThreadLoop);
		}
		~WorkerThreads()
		{
			{
				lock_guard<mutex> lock(asyncMutex);
				shouldQuit = true;
			}
			asyncCondition.notify_all();
			for(thread &t : threads)
				t.join();
		}

		vector<thread> threads;
	} threads;
}



TaskQueue::~TaskQueue()
{
	// Make sure every task that belongs to this queue is finished.
	Wait();
}



// Queue a function to execute in parallel, with another optional function that
// will get executed on the main thread after the first function finishes.
// Returns a future representing the future result of the async call. Ignores
// any main thread task that still need to be executed!
std::shared_future<void> TaskQueue::Run(function<void()> asyncTask, function<void()> syncTask)
{
	std::shared_future<void> result;
	{
		lock_guard<mutex> lock(asyncMutex);
		// Do nothing if we are destroying the queue already.
		if(shouldQuit)
			return result;

		// Queue this task for execution and create a future to track its state.
		tasks.push(Task{this, std::move(asyncTask), std::move(syncTask)});
		result = futures.emplace_back(tasks.back().futurePromise.get_future());
		tasks.back().futureIt = std::prev(futures.end());
	}
	asyncCondition.notify_one();
	return result;
}



// Process any tasks to be scheduled to be executed on the main thread.
void TaskQueue::ProcessSyncTasks()
{
	unique_lock<mutex> lock(syncMutex);
	for(int i = 0; !syncTasks.empty() && i < MAX_SYNC_TASKS; ++i)
	{
		// Extract the one item we should work on right now.
		auto task = std::move(syncTasks.front());
		syncTasks.pop();

		lock.unlock();
		task();
		lock.lock();
	}
}



// Waits for all of this queue's task to finish. Ignores any sync tasks to be processed.
void TaskQueue::Wait()
{
	while(!IsDone())
		this_thread::yield();
}



// Whether there are any outstanding async tasks left in this queue.
bool TaskQueue::IsDone() const
{
	lock_guard<mutex> lock(asyncMutex);
	return futures.empty();
}



// Thread entry point.
void TaskQueue::ThreadLoop() noexcept
{
	while(true)
	{
		unique_lock<mutex> lock(asyncMutex);
		while(true)
		{
			// Check whether it is time for this thread to quit.
			if(shouldQuit)
				return;
			// No more tasks to execute, just go to sleep.
			if(tasks.empty())
				break;

			// Extract the one item we should work on reading right now.
			auto task = std::move(tasks.front());
			tasks.pop();

			// Unlock the mutex so other threads can access the queue.
			lock.unlock();

			// Execute the task.
			try {
				if(task.async)
					task.async();
			}
			catch(...)
			{
				// Any exception by the task is caught and rethrown inside the main thread
				// so we can handle it appropriately.
				auto exception = current_exception();
				task.sync = [exception] { rethrow_exception(exception); };
			}

			// If there is a followup function to execute, queue it for execution
			// in the main thread.
			if(task.sync)
			{
				unique_lock<mutex> lock(task.queue->syncMutex);
				task.queue->syncTasks.push(std::move(task.sync));
			}

			// We are done and can mark the future as ready.
			task.futurePromise.set_value();

			lock.lock();

			// Now that the task has been executed, stop tracking the future internally.
			// Anybody who still cares about the future will have a copy themselves.
			task.queue->futures.erase(task.futureIt);
		}

		asyncCondition.wait(lock, [] { return shouldQuit || !tasks.empty(); });
	}
}