1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
|
/////////////////////////////////////////////////////////////////////////////
/// @file thread_queue.h
/// Implementation of the template class 'thread_queue', a thread-safe,
/// blocking queue for passing data between threads, safe for use with smart
/// pointers.
/// @date 09-Jan-2017
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2017-2022 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v20.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_thread_queue_h
#define __mqtt_thread_queue_h
#include <algorithm>
#include <condition_variable>
#include <deque>
#include <limits>
#include <mutex>
#include <queue>
#include <thread>
namespace mqtt {
/**
* Exception that is thrown when operations are performed on a closed
* queue.
*/
class queue_closed : public std::runtime_error
{
public:
queue_closed() : std::runtime_error("queue is closed") {}
};
/////////////////////////////////////////////////////////////////////////////
/**
* A thread-safe queue for inter-thread communication.
*
* This is a locking queue with blocking operations. The get() operations
* can always block on an empty queue, but have variations for non-blocking
* (try_get) and bounded-time blocking (try_get_for, try_get_until).
* @par
* The default queue has a capacity that is unbounded in the practical
* sense, limited by available memory. In this mode the object will not
* block when placing values into the queue. A capacity can bet set with the
* constructor or, at any time later by calling the @ref capacity(size_type)
* method. Using this latter method, the capacity can be set to an amount
* smaller than the current size of the queue. In that case all put's to the
* queue will block until the number of items are removed from the queue to
* bring the size below the new capacity.
* @par
* The queue can be closed. After that, no new items can be placed into it;
* a `put()` calls will fail. Receivers can still continue to get any items
* out of the queue that were added before it was closed. Once there are no
* more items left in the queue after it is closed, it is considered "done".
* Nothing useful can be done with the queue.
* @par
* Note that the queue uses move semantics to place items into the queue and
* remove items from the queue. This means that the type, T, of the data
* held by the queue only needs to follow move semantics; not copy
* semantics. In addition, this means that copies of the value will @em not
* be left in the queue. This is especially useful when creating queues of
* shared pointers, as the "dead" part of the queue will not hold onto a
* reference count after the item has been removed from the queue.
*
* @tparam T The type of the items to be held in the queue.
* @tparam Container The type of the underlying container to use. It must
* support back(), front(), push_back(), pop_front().
*/
template <typename T, class Container = std::deque<T>>
class thread_queue
{
public:
/** The underlying container type to use for the queue. */
using container_type = Container;
/** The type of items to be held in the queue. */
using value_type = T;
/** The type used to specify number of items in the container. */
using size_type = typename Container::size_type;
/** The maximum capacity of the queue. */
static constexpr size_type MAX_CAPACITY = std::numeric_limits<size_type>::max();
private:
/** Object lock */
mutable std::mutex lock_;
/** Condition get signaled when item added to empty queue */
std::condition_variable notEmptyCond_;
/** Condition gets signaled then item removed from full queue */
std::condition_variable notFullCond_;
/** The capacity of the queue */
size_type cap_{MAX_CAPACITY};
/** Whether the queue is closed */
bool closed_{false};
/** The actual STL container to hold data */
std::queue<T, Container> que_;
/** Simple, scope-based lock guard */
using guard = std::lock_guard<std::mutex>;
/** General purpose guard */
using unique_guard = std::unique_lock<std::mutex>;
/** Checks if the queue is done (unsafe) */
bool is_done() const { return closed_ && que_.empty(); }
public:
/**
* Constructs a queue with the maximum capacity.
* This is effectively an unbounded queue.
*/
thread_queue() {}
/**
* Constructs a queue with the specified capacity.
* This is a bounded queue.
* @param cap The maximum number of items that can be placed in the
* queue. The minimum capacity is 1.
*/
explicit thread_queue(size_t cap) : cap_(std::max<size_type>(cap, 1)) {}
/**
* Determine if the queue is empty.
* @return @em true if there are no elements in the queue, @em false if
* there are any items in the queue.
*/
bool empty() const {
guard g{lock_};
return que_.empty();
}
/**
* Gets the capacity of the queue.
* @return The maximum number of elements before the queue is full.
*/
size_type capacity() const {
guard g{lock_};
return cap_;
}
/**
* Sets the capacity of the queue.
* Note that the capacity can be set to a value smaller than the current
* size of the queue. In that event, all calls to put() will block until
* a sufficient number
*/
void capacity(size_type cap) {
guard g{lock_};
cap_ = cap;
}
/**
* Gets the number of items in the queue.
* @return The number of items in the queue.
*/
size_type size() const {
guard g{lock_};
return que_.size();
}
/**
* Close the queue.
* Once closed, the queue will not accept any new items, but receievers
* will still be able to get any remaining items out of the queue until
* it is empty.
*/
void close() {
guard g{lock_};
closed_ = true;
notFullCond_.notify_all();
notEmptyCond_.notify_all();
}
/**
* Determines if the queue is closed.
* Once closed, the queue will not accept any new items, but receievers
* will still be able to get any remaining items out of the queue until
* it is empty.
* @return @em true if the queue is closed, @false otherwise.
*/
bool closed() const {
guard g{lock_};
return closed_;
}
/**
* Determines if all possible operations are done on the queue.
* If the queue is closed and empty, then no further useful operations
* can be done on it.
* @return @true if the queue is closed and empty, @em false otherwise.
*/
bool done() const {
guard g{lock_};
return is_done();
}
/**
* Clear the contents of the queue.
* This discards all items in the queue.
*/
void clear() {
guard g{lock_};
while (!que_.empty()) que_.pop();
notFullCond_.notify_all();
}
/**
* Put an item into the queue.
* If the queue is full, this will block the caller until items are
* removed bringing the size less than the capacity.
* @param val The value to add to the queue.
*/
void put(value_type val) {
unique_guard g{lock_};
notFullCond_.wait(g, [this] { return que_.size() < cap_ || closed_; });
if (closed_)
throw queue_closed{};
que_.emplace(std::move(val));
notEmptyCond_.notify_one();
}
/**
* Non-blocking attempt to place an item into the queue.
* @param val The value to add to the queue.
* @return @em true if the item was added to the queue, @em false if the
* item was not added because the queue is currently full.
*/
bool try_put(value_type val) {
guard g{lock_};
if (que_.size() >= cap_ || closed_)
return false;
que_.emplace(std::move(val));
notEmptyCond_.notify_one();
return true;
}
/**
* Attempt to place an item in the queue with a bounded wait.
* This will attempt to place the value in the queue, but if it is full,
* it will wait up to the specified time duration before timing out.
* @param val The value to add to the queue.
* @param relTime The amount of time to wait until timing out.
* @return @em true if the value was added to the queue, @em false if a
* timeout occurred.
*/
template <typename Rep, class Period>
bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
unique_guard g{lock_};
bool to = !notFullCond_.wait_for(g, relTime, [this] {
return que_.size() < cap_ || closed_;
});
if (to || closed_)
return false;
que_.emplace(std::move(val));
notEmptyCond_.notify_one();
return true;
}
/**
* Attempt to place an item in the queue with a bounded wait to an
* absolute time point.
* This will attempt to place the value in the queue, but if it is full,
* it will wait up until the specified time before timing out.
* @param val The value to add to the queue.
* @param absTime The absolute time to wait to before timing out.
* @return @em true if the value was added to the queue, @em false if a
* timeout occurred.
*/
template <class Clock, class Duration>
bool try_put_until(
value_type val, const std::chrono::time_point<Clock, Duration>& absTime
) {
unique_guard g{lock_};
bool to = !notFullCond_.wait_until(g, absTime, [this] {
return que_.size() < cap_ || closed_;
});
if (to || closed_)
return false;
que_.emplace(std::move(val));
notEmptyCond_.notify_one();
return true;
}
/**
* Retrieve a value from the queue.
* If the queue is empty, this will block indefinitely until a value is
* added to the queue by another thread,
* @param val Pointer to a variable to receive the value.
*/
bool get(value_type* val) {
if (!val)
return false;
unique_guard g{lock_};
notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
if (que_.empty()) // We must be done
return false;
*val = std::move(que_.front());
que_.pop();
notFullCond_.notify_one();
return true;
}
/**
* Retrieve a value from the queue.
* If the queue is empty, this will block indefinitely until a value is
* added to the queue by another thread,
* @return The value removed from the queue
*/
value_type get() {
unique_guard g{lock_};
notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
if (que_.empty()) // We must be done
throw queue_closed{};
value_type val = std::move(que_.front());
que_.pop();
notFullCond_.notify_one();
return val;
}
/**
* Attempts to remove a value from the queue without blocking.
* If the queue is currently empty, this will return immediately with a
* failure, otherwise it will get the next value and return it.
* @param val Pointer to a variable to receive the value.
* @return @em true if a value was removed from the queue, @em false if
* the queue is empty.
*/
bool try_get(value_type* val) {
if (!val)
return false;
guard g{lock_};
if (que_.empty())
return false;
*val = std::move(que_.front());
que_.pop();
notFullCond_.notify_one();
return true;
}
/**
* Attempt to remove an item from the queue for a bounded amount of time.
* This will retrieve the next item from the queue. If the queue is
* empty, it will wait the specified amount of time for an item to arrive
* before timing out.
* @param val Pointer to a variable to receive the value.
* @param relTime The amount of time to wait until timing out.
* @return @em true if the value was removed the queue, @em false if a
* timeout occurred.
*/
template <typename Rep, class Period>
bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
if (!val)
return false;
unique_guard g{lock_};
notEmptyCond_.wait_for(g, relTime, [this] { return !que_.empty() || closed_; });
if (que_.empty())
return false;
*val = std::move(que_.front());
que_.pop();
notFullCond_.notify_one();
return true;
}
/**
* Attempt to remove an item from the queue for a bounded amount of time.
* This will retrieve the next item from the queue. If the queue is
* empty, it will wait until the specified time for an item to arrive
* before timing out.
* @param val Pointer to a variable to receive the value.
* @param absTime The absolute time to wait to before timing out.
* @return @em true if the value was removed from the queue, @em false
* if a timeout occurred.
*/
template <class Clock, class Duration>
bool try_get_until(
value_type* val, const std::chrono::time_point<Clock, Duration>& absTime
) {
if (!val)
return false;
unique_guard g{lock_};
notEmptyCond_.wait_until(g, absTime, [this] { return !que_.empty() || closed_; });
if (que_.empty())
return false;
*val = std::move(que_.front());
que_.pop();
notFullCond_.notify_one();
return true;
}
};
/////////////////////////////////////////////////////////////////////////////
} // namespace mqtt
#endif // __mqtt_thread_queue_h
|