File: mu-async-queue.hh

package info (click to toggle)
maildir-utils 1.12.14-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,996 kB
  • sloc: cpp: 56,429; lisp: 11,218; sh: 895; ansic: 889; makefile: 126; pascal: 12; python: 4
file content (184 lines) | stat: -rw-r--r-- 4,617 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/*
** Copyright (C) 2020-2023 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
**
** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the
** Free Software Foundation; either version 3, or (at your option) any
** later version.
**
** This program is distributed in the hope that it will be useful,
** but WITHOUT ANY WARRANTY; without even the implied warranty of
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
** GNU General Public License for more details.
**
** You should have received a copy of the GNU General Public License
** along with this program; if not, write to the Free Software Foundation,
** Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**
*/

#ifndef __MU_ASYNC_QUEUE_HH__
#define __MU_ASYNC_QUEUE_HH__

#include <deque>
#include <mutex>
#include <chrono>
#include <condition_variable>

namespace Mu {

constexpr std::size_t UnlimitedAsyncQueueSize{0};

template <typename ItemType,                              /**< the type of Item to queue */
	  std::size_t MaxSize = UnlimitedAsyncQueueSize,  /**< maximum size for the queue */
	  typename Allocator  = std::allocator<ItemType>> /**< allocator for the items */

class AsyncQueue {
      public:
	using value_type      = ItemType;
	using allocator_type  = Allocator;
	using size_type       = std::size_t;
	using reference       = value_type&;
	using const_reference = const value_type&;
	using pointer         = typename std::allocator_traits<allocator_type>::pointer;
	using const_pointer   = typename std::allocator_traits<allocator_type>::const_pointer;

	using Timeout = std::chrono::steady_clock::duration;

	/**
	 * Push an item to the end of the queue by moving it
	 *
	 * @param item the item to move to the end of the queue
	 * @param timeout and optional timeout
	 *
	 * @return true if the item was pushed; false otherwise.
	 */
	bool push(const value_type& item, Timeout timeout = {}) {
		return push(std::move(value_type(item)), timeout);
	}

	/**
	 * Push an item to the end of the queue by moving it
	 *
	 * @param item the item to move to the end of the queue
	 * @param timeout and optional timeout
	 *
	 * @return true if the item was pushed; false otherwise.
	 */
	bool push(value_type&& item, Timeout timeout = {}) {
		std::unique_lock lock{m_};

		if (!unlimited()) {
			const auto rv = cv_full_.wait_for(lock, timeout, [&]() {
				return !full_unlocked();
			}) && !full_unlocked();
			if (!rv)
				return false;
		}

		q_.emplace_back(std::move(item));
		cv_empty_.notify_one();

		return true;
	}

	/**
	 * Pop an item from the queue
	 *
	 * @param receives the value if the function returns true
	 * @param timeout optional time to wait for an item to become available
	 *
	 * @return true if an item was popped (into val), false otherwise.
	 */
	bool pop(value_type& val, Timeout timeout = {}) {
		std::unique_lock lock{m_};

		if (timeout != Timeout{}) {
			const auto rv = cv_empty_.wait_for(lock, timeout, [&]() {
				return !q_.empty();
			}) && !q_.empty();
			if (!rv)
				return false;

		} else if (q_.empty())
			return false;

		val = std::move(q_.front());
		q_.pop_front();
		cv_full_.notify_one();

		return true;
	}

	/**
	 * Clear the queue
	 *
	 */
	void clear() {
		std::unique_lock lock{m_};
		q_.clear();
		cv_full_.notify_one();
	}

	/**
	 * Size of the queue
	 *
	 *
	 * @return the size
	 */
	size_type size() const {
		std::unique_lock lock{m_};
		return q_.size();
	}

	/**
	 * Maximum size of the queue if specified through the template
	 * parameter; otherwise the (theoretical) max_size of the inner
	 * container.
	 *
	 * @return the maximum size
	 */
	size_type max_size() const { return unlimited() ? q_.max_size() : MaxSize; }

	/**
	 * Is the queue empty?
	 *
	 * @return true or false
	 */
	bool empty() const {
		std::unique_lock lock{m_};
		return q_.empty();
	}

	/**
	 * Is the queue full? Returns false unless a maximum size was specified
	 * (as a template argument)
	 *
	 * @return true or false.
	 */
	bool full() const {
		if (unlimited())
			return false;

		std::unique_lock lock{m_};
		return full_unlocked();
	}

	/**
	 * Is this queue (theoretically) unlimited in size?
	 *
	 * @return true or false
	 */
	constexpr static bool unlimited() { return MaxSize == UnlimitedAsyncQueueSize; }

private:
	bool full_unlocked() const { return q_.size() >= max_size(); }

	std::deque<ItemType, Allocator> q_;
	mutable std::mutex              m_;
	std::condition_variable         cv_full_, cv_empty_;
};

} // namespace Mu

#endif /* __MU_ASYNC_QUEUE_HH__ */