File: io-engine.hpp

package info (click to toggle)
icinga2 2.15.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 20,040 kB
  • sloc: cpp: 97,870; sql: 3,261; cs: 1,636; yacc: 1,584; sh: 1,009; ansic: 890; lex: 420; python: 80; makefile: 62; javascript: 12
file content (279 lines) | stat: -rw-r--r-- 7,813 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */

#ifndef IO_ENGINE_H
#define IO_ENGINE_H

#include "base/atomic.hpp"
#include "base/debug.hpp"
#include "base/exception.hpp"
#include "base/lazy-init.hpp"
#include "base/logger.hpp"
#include "base/shared.hpp"
#include <atomic>
#include <exception>
#include <memory>
#include <thread>
#include <utility>
#include <vector>
#include <stdexcept>
#include <boost/context/fixedsize_stack.hpp>
#include <boost/exception/all.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>

#if BOOST_VERSION >= 108700
#	include <boost/asio/detached.hpp>
#endif // BOOST_VERSION >= 108700

namespace icinga
{

/**
 * Scope lock for CPU-bound work done in an I/O thread
 *
 * @ingroup base
 */
class CpuBoundWork
{
public:
	CpuBoundWork(boost::asio::yield_context yc);
	CpuBoundWork(const CpuBoundWork&) = delete;
	CpuBoundWork(CpuBoundWork&&) = delete;
	CpuBoundWork& operator=(const CpuBoundWork&) = delete;
	CpuBoundWork& operator=(CpuBoundWork&&) = delete;
	~CpuBoundWork();

	void Done();

private:
	bool m_Done;
};

/**
 * Scope break for CPU-bound work done in an I/O thread
 *
 * @ingroup base
 */
class IoBoundWorkSlot
{
public:
	IoBoundWorkSlot(boost::asio::yield_context yc);
	IoBoundWorkSlot(const IoBoundWorkSlot&) = delete;
	IoBoundWorkSlot(IoBoundWorkSlot&&) = delete;
	IoBoundWorkSlot& operator=(const IoBoundWorkSlot&) = delete;
	IoBoundWorkSlot& operator=(IoBoundWorkSlot&&) = delete;
	~IoBoundWorkSlot();

private:
	boost::asio::yield_context yc;
};

/**
 * Async I/O engine
 *
 * @ingroup base
 */
class IoEngine
{
	friend CpuBoundWork;
	friend IoBoundWorkSlot;

public:
	IoEngine(const IoEngine&) = delete;
	IoEngine(IoEngine&&) = delete;
	IoEngine& operator=(const IoEngine&) = delete;
	IoEngine& operator=(IoEngine&&) = delete;
	~IoEngine();

	static IoEngine& Get();

	boost::asio::io_context& GetIoContext();

	static inline size_t GetCoroutineStackSize() {
#ifdef _WIN32
		// Increase the stack size for Windows coroutines to prevent exception corruption.
		// Rationale: Low cost Windows agent only & https://github.com/Icinga/icinga2/issues/7431
		return 8 * 1024 * 1024;
#else /* _WIN32 */
		// Increase the stack size for Linux/Unix coroutines for many JSON objects on the stack.
		// This may help mitigate possible stack overflows. https://github.com/Icinga/icinga2/issues/7532
		return 256 * 1024;
		//return boost::coroutines::stack_allocator::traits_type::default_size(); // Default 64 KB
#endif /* _WIN32 */
	}

	template <typename Handler, typename Function>
	static void SpawnCoroutine(Handler& h, Function f) {
		auto wrapper = [f = std::move(f)](boost::asio::yield_context yc) {
			try {
				f(yc);
			} catch (const std::exception& ex) {
				Log(LogCritical, "IoEngine") << "Exception in coroutine: " << DiagnosticInformation(ex);
			} catch (...) {
				try {
					Log(LogCritical, "IoEngine", "Exception in coroutine!");
				} catch (...) {
				}

				// Required for proper stack unwinding when coroutines are destroyed.
				// https://github.com/boostorg/coroutine/issues/39
				throw;
			}
		};

#if BOOST_VERSION >= 108700
		boost::asio::spawn(h,
			std::allocator_arg, boost::context::fixedsize_stack(GetCoroutineStackSize()),
			std::move(wrapper),
			boost::asio::detached
		);
#else // BOOST_VERSION >= 108700
		boost::asio::spawn(h, std::move(wrapper), boost::coroutines::attributes(GetCoroutineStackSize()));
#endif // BOOST_VERSION >= 108700
	}

	static inline
	void YieldCurrentCoroutine(boost::asio::yield_context yc)
	{
		Get().m_AlreadyExpiredTimer.async_wait(yc);
	}

private:
	IoEngine();

	void RunEventLoop();

	static LazyInit<std::unique_ptr<IoEngine>> m_Instance;

	boost::asio::io_context m_IoContext;
	boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
	std::vector<std::thread> m_Threads;
	boost::asio::deadline_timer m_AlreadyExpiredTimer;
	std::atomic_int_fast32_t m_CpuBoundSemaphore;
};

class TerminateIoThread : public std::exception
{
};

/**
 * Awaitable flag which doesn't block I/O threads, inspired by threading.Event from Python
 *
 * @ingroup base
 */
class AsioEvent
{
public:
	AsioEvent(boost::asio::io_context& io, bool init = false);

	void Set();
	void Clear();
	void Wait(boost::asio::yield_context yc);

private:
	boost::asio::deadline_timer m_Timer;
};

/**
 * Like AsioEvent, which only allows waiting for an event to be set, but additionally supports waiting for clearing
 *
 * @ingroup base
 */
class AsioDualEvent
{
public:
	AsioDualEvent(boost::asio::io_context& io, bool init = false);

	void Set();
	void Clear();

	void WaitForSet(boost::asio::yield_context yc);
	void WaitForClear(boost::asio::yield_context yc);

private:
	AsioEvent m_IsTrue, m_IsFalse;
};

/**
 * I/O timeout emulator
 *
 * This class provides a workaround for Boost.ASIO's lack of built-in timeout support.
 * While Boost.ASIO handles asynchronous operations, it does not natively support timeouts for these operations.
 * This class uses a boost::asio::deadline_timer to emulate a timeout by scheduling a callback to be triggered
 * after a specified duration, effectively adding timeout behavior where none exists.
 * The callback is executed within the provided strand, ensuring thread-safety.
 *
 * The constructor returns immediately after scheduling the timeout callback.
 * The callback itself is invoked asynchronously when the timeout occurs.
 * This allows the caller to continue execution while the timeout is running in the background.
 *
 * The class provides a Cancel() method to unschedule any pending callback. If the callback has already been run,
 * calling Cancel() has no effect. This method can be used to abort the timeout early if the monitored operation
 * completes before the callback has been run. The Timeout destructor also automatically cancels any pending callback.
 * A callback is considered pending even if the timeout has already expired,
 * but the callback has not been executed yet due to a busy strand.
 *
 * @ingroup base
 */
class Timeout
{
public:
	using Timer = boost::asio::deadline_timer;

	/**
	 * Schedules onTimeout to be triggered after timeoutFromNow on strand.
	 *
	 * @param strand The strand in which the callback will be executed.
	 *				 The caller must also run in this strand, as well as Cancel() and the destructor!
	 * @param timeoutFromNow The duration after which the timeout callback will be triggered.
	 * @param onTimeout The callback to invoke when the timeout occurs.
	 */
	template<class OnTimeout>
	Timeout(boost::asio::io_context::strand& strand, const Timer::duration_type& timeoutFromNow, OnTimeout onTimeout)
		: m_Timer(strand.context(), timeoutFromNow), m_Cancelled(Shared<Atomic<bool>>::Make(false))
	{
		VERIFY(strand.running_in_this_thread());

		m_Timer.async_wait(boost::asio::bind_executor(
			strand, [cancelled = m_Cancelled, onTimeout = std::move(onTimeout)](boost::system::error_code ec) {
				if (!ec && !cancelled->load()) {
					onTimeout();
				}
			}
		));
	}

	Timeout(const Timeout&) = delete;
	Timeout(Timeout&&) = delete;
	Timeout& operator=(const Timeout&) = delete;
	Timeout& operator=(Timeout&&) = delete;

	/**
	 * Cancels any pending timeout callback.
	 *
	 * Must be called in the strand in which the callback was scheduled!
	 */
	~Timeout()
	{
		Cancel();
	}

	void Cancel();

private:
	Timer m_Timer;

	/**
	 * Indicates whether the Timeout has been cancelled.
	 *
	 * This must be Shared<> between the lambda in the constructor and Cancel() for the case
	 * the destructor calls Cancel() while the lambda is already queued in the strand.
	 * The whole Timeout instance can't be kept alive by the lambda because this would delay the destructor.
	 */
	Shared<Atomic<bool>>::Ptr m_Cancelled;
};

}

#endif /* IO_ENGINE_H */