File: ThreadedEventProcessor.h

package info (click to toggle)
freespace2 24.2.0%2Brepack-1
  • links: PTS, VCS
  • area: non-free
  • in suites: forky, sid
  • size: 43,716 kB
  • sloc: cpp: 595,001; ansic: 21,741; python: 1,174; sh: 457; makefile: 248; xml: 181
file content (77 lines) | stat: -rw-r--r-- 1,804 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#pragma once

#include "globalincs/pstypes.h"
#include "tracing/tracing.h"

#include "utils/boost/syncboundedqueue.h"

#include <thread>


/** @file
 *  @ingroup tracing
 */

namespace tracing {

/**
 * @brief A multi-threaded event processor
 *
 * This is a utility class that can be used to implement an event processor that uses a different thread to process the
 * events. To use this, declare your class with a method with the signature
 *
 * @code{.cpp}
 * void processEvent(const trace_event* event)
 * @endcode
 *
 * This function will be called in a background-thread whenever a new event arrives.
 *
 * @tparam Processor Your processor implementation
 * @tparam QUEUE_SIZE The maximum size of the internal event buffer
 */
template<class Processor, size_t QUEUE_SIZE = 200>
class ThreadedEventProcessor {
	sync_bounded_queue<trace_event> _event_queue;

	std::thread _worker_thread;

	Processor _processor;

	void workerThread() {
		while (!_event_queue.closed()) {
			try {
				trace_event evt;
				auto status = _event_queue.wait_pull_front(evt);

				if (status != success) {
					break;
				}

				_processor.processEvent(&evt);
			}
			catch (const sync_queue_is_closed&) {
				// We are done here
				break;
			}
		}
	}
 public:
	template<typename... Params>
	explicit ThreadedEventProcessor(Params&& ... params)
		: _event_queue(QUEUE_SIZE), _worker_thread(&ThreadedEventProcessor<Processor>::workerThread, this),
		  _processor(std::forward<Params>(params)...) {}
	~ThreadedEventProcessor() {
		_event_queue.close();
		_worker_thread.join();
	}

	void processEvent(const trace_event* event) {
		try {
			_event_queue.wait_push_back(*event);
		} catch (const sync_queue_is_closed&) {
			mprintf(("Stream queue was closed in processEvent! This should not be possible...\n"));
		}
	}
};

}