File: thread_queue.h

package info (click to toggle)
paho.mqtt.cpp 1.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,672 kB
  • sloc: cpp: 13,068; ansic: 113; sh: 55; makefile: 22
file content (405 lines) | stat: -rw-r--r-- 14,435 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
/////////////////////////////////////////////////////////////////////////////
/// @file thread_queue.h
/// Implementation of the template class 'thread_queue', a thread-safe,
/// blocking queue for passing data between threads, safe for use with smart
/// pointers.
/// @date 09-Jan-2017
/////////////////////////////////////////////////////////////////////////////

/*******************************************************************************
 * Copyright (c) 2017-2022 Frank Pagliughi <fpagliughi@mindspring.com>
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v2.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v20.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Frank Pagliughi - initial implementation and documentation
 *******************************************************************************/

#ifndef __mqtt_thread_queue_h
#define __mqtt_thread_queue_h

#include <algorithm>
#include <condition_variable>
#include <deque>
#include <limits>
#include <mutex>
#include <queue>
#include <thread>

namespace mqtt {

/**
 * Exception that is thrown when operations are performed on a closed
 * queue.
 */
class queue_closed : public std::runtime_error
{
public:
    queue_closed() : std::runtime_error("queue is closed") {}
};

/////////////////////////////////////////////////////////////////////////////

/**
 * A thread-safe queue for inter-thread communication.
 *
 * This is a locking queue with blocking operations. The get() operations
 * can always block on an empty queue, but have variations for non-blocking
 * (try_get) and bounded-time blocking (try_get_for, try_get_until).
 * @par
 * The default queue has a capacity that is unbounded in the practical
 * sense, limited by available memory. In this mode the object will not
 * block when placing values into the queue. A capacity can bet set with the
 * constructor or, at any time later by calling the @ref capacity(size_type)
 * method. Using this latter method, the capacity can be set to an amount
 * smaller than the current size of the queue. In that case all put's to the
 * queue will block until the number of items are removed from the queue to
 * bring the size below the new capacity.
 * @par
 * The queue can be closed. After that, no new items can be placed into it;
 * a `put()` calls will fail. Receivers can still continue to get any items
 * out of the queue that were added before it was closed. Once there are no
 * more items left in the queue after it is closed, it is considered "done".
 * Nothing useful can be done with the queue.
 * @par
 * Note that the queue uses move semantics to place items into the queue and
 * remove items from the queue. This means that the type, T, of the data
 * held by the queue only needs to follow move semantics; not copy
 * semantics. In addition, this means that copies of the value will @em not
 * be left in the queue. This is especially useful when creating queues of
 * shared pointers, as the "dead" part of the queue will not hold onto a
 * reference count after the item has been removed from the queue.
 *
 * @tparam T The type of the items to be held in the queue.
 * @tparam Container The type of the underlying container to use. It must
 * support back(), front(), push_back(), pop_front().
 */
template <typename T, class Container = std::deque<T>>
class thread_queue
{
public:
    /** The underlying container type to use for the queue. */
    using container_type = Container;
    /** The type of items to be held in the queue. */
    using value_type = T;
    /** The type used to specify number of items in the container. */
    using size_type = typename Container::size_type;

    /** The maximum capacity of the queue. */
    static constexpr size_type MAX_CAPACITY = std::numeric_limits<size_type>::max();

private:
    /** Object lock */
    mutable std::mutex lock_;
    /** Condition get signaled when item added to empty queue */
    std::condition_variable notEmptyCond_;
    /** Condition gets signaled then item removed from full queue */
    std::condition_variable notFullCond_;
    /** The capacity of the queue */
    size_type cap_{MAX_CAPACITY};
    /** Whether the queue is closed */
    bool closed_{false};

    /** The actual STL container to hold data */
    std::queue<T, Container> que_;

    /** Simple, scope-based lock guard */
    using guard = std::lock_guard<std::mutex>;
    /** General purpose guard */
    using unique_guard = std::unique_lock<std::mutex>;

    /** Checks if the queue is done (unsafe) */
    bool is_done() const { return closed_ && que_.empty(); }

public:
    /**
     * Constructs a queue with the maximum capacity.
     * This is effectively an unbounded queue.
     */
    thread_queue() {}
    /**
     * Constructs a queue with the specified capacity.
     * This is a bounded queue.
     * @param cap The maximum number of items that can be placed in the
     *  		  queue. The minimum capacity is 1.
     */
    explicit thread_queue(size_t cap) : cap_(std::max<size_type>(cap, 1)) {}
    /**
     * Determine if the queue is empty.
     * @return @em true if there are no elements in the queue, @em false if
     *  	   there are any items in the queue.
     */
    bool empty() const {
        guard g{lock_};
        return que_.empty();
    }
    /**
     * Gets the capacity of the queue.
     * @return The maximum number of elements before the queue is full.
     */
    size_type capacity() const {
        guard g{lock_};
        return cap_;
    }
    /**
     * Sets the capacity of the queue.
     * Note that the capacity can be set to a value smaller than the current
     * size of the queue. In that event, all calls to put() will block until
     * a sufficient number
     */
    void capacity(size_type cap) {
        guard g{lock_};
        cap_ = cap;
    }
    /**
     * Gets the number of items in the queue.
     * @return The number of items in the queue.
     */
    size_type size() const {
        guard g{lock_};
        return que_.size();
    }
    /**
     * Close the queue.
     * Once closed, the queue will not accept any new items, but receievers
     * will still be able to get any remaining items out of the queue until
     * it is empty.
     */
    void close() {
        guard g{lock_};
        closed_ = true;
        notFullCond_.notify_all();
        notEmptyCond_.notify_all();
    }
    /**
     * Determines if the queue is closed.
     * Once closed, the queue will not accept any new items, but receievers
     * will still be able to get any remaining items out of the queue until
     * it is empty.
     * @return @em true if the queue is closed, @false otherwise.
     */
    bool closed() const {
        guard g{lock_};
        return closed_;
    }
    /**
     * Determines if all possible operations are done on the queue.
     * If the queue is closed and empty, then no further useful operations
     * can be done on it.
     * @return @true if the queue is closed and empty, @em false otherwise.
     */
    bool done() const {
        guard g{lock_};
        return is_done();
    }
    /**
     * Clear the contents of the queue.
     * This discards all items in the queue.
     */
    void clear() {
        guard g{lock_};
        while (!que_.empty()) que_.pop();
        notFullCond_.notify_all();
    }
    /**
     * Put an item into the queue.
     * If the queue is full, this will block the caller until items are
     * removed bringing the size less than the capacity.
     * @param val The value to add to the queue.
     */
    void put(value_type val) {
        unique_guard g{lock_};
        notFullCond_.wait(g, [this] { return que_.size() < cap_ || closed_; });
        if (closed_)
            throw queue_closed{};

        que_.emplace(std::move(val));
        notEmptyCond_.notify_one();
    }
    /**
     * Non-blocking attempt to place an item into the queue.
     * @param val The value to add to the queue.
     * @return @em true if the item was added to the queue, @em false if the
     *  	   item was not added because the queue is currently full.
     */
    bool try_put(value_type val) {
        guard g{lock_};
        if (que_.size() >= cap_ || closed_)
            return false;

        que_.emplace(std::move(val));
        notEmptyCond_.notify_one();
        return true;
    }
    /**
     * Attempt to place an item in the queue with a bounded wait.
     * This will attempt to place the value in the queue, but if it is full,
     * it will wait up to the specified time duration before timing out.
     * @param val The value to add to the queue.
     * @param relTime The amount of time to wait until timing out.
     * @return @em true if the value was added to the queue, @em false if a
     *  	   timeout occurred.
     */
    template <typename Rep, class Period>
    bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
        unique_guard g{lock_};
        bool to = !notFullCond_.wait_for(g, relTime, [this] {
            return que_.size() < cap_ || closed_;
        });
        if (to || closed_)
            return false;

        que_.emplace(std::move(val));
        notEmptyCond_.notify_one();
        return true;
    }
    /**
     * Attempt to place an item in the queue with a bounded wait to an
     * absolute time point.
     * This will attempt to place the value in the queue, but if it is full,
     * it will wait up until the specified time before timing out.
     * @param val The value to add to the queue.
     * @param absTime The absolute time to wait to before timing out.
     * @return @em true if the value was added to the queue, @em false if a
     *  	   timeout occurred.
     */
    template <class Clock, class Duration>
    bool try_put_until(
        value_type val, const std::chrono::time_point<Clock, Duration>& absTime
    ) {
        unique_guard g{lock_};
        bool to = !notFullCond_.wait_until(g, absTime, [this] {
            return que_.size() < cap_ || closed_;
        });

        if (to || closed_)
            return false;

        que_.emplace(std::move(val));
        notEmptyCond_.notify_one();
        return true;
    }
    /**
     * Retrieve a value from the queue.
     * If the queue is empty, this will block indefinitely until a value is
     * added to the queue by another thread,
     * @param val Pointer to a variable to receive the value.
     */
    bool get(value_type* val) {
        if (!val)
            return false;

        unique_guard g{lock_};
        notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
        if (que_.empty())  // We must be done
            return false;

        *val = std::move(que_.front());
        que_.pop();
        notFullCond_.notify_one();
        return true;
    }
    /**
     * Retrieve a value from the queue.
     * If the queue is empty, this will block indefinitely until a value is
     * added to the queue by another thread,
     * @return The value removed from the queue
     */
    value_type get() {
        unique_guard g{lock_};
        notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
        if (que_.empty())  // We must be done
            throw queue_closed{};

        value_type val = std::move(que_.front());
        que_.pop();
        notFullCond_.notify_one();
        return val;
    }
    /**
     * Attempts to remove a value from the queue without blocking.
     * If the queue is currently empty, this will return immediately with a
     * failure, otherwise it will get the next value and return it.
     * @param val Pointer to a variable to receive the value.
     * @return @em true if a value was removed from the queue, @em false if
     *  	   the queue is empty.
     */
    bool try_get(value_type* val) {
        if (!val)
            return false;

        guard g{lock_};
        if (que_.empty())
            return false;

        *val = std::move(que_.front());
        que_.pop();
        notFullCond_.notify_one();
        return true;
    }
    /**
     * Attempt to remove an item from the queue for a bounded amount of time.
     * This will retrieve the next item from the queue. If the queue is
     * empty, it will wait the specified amount of time for an item to arrive
     * before timing out.
     * @param val Pointer to a variable to receive the value.
     * @param relTime The amount of time to wait until timing out.
     * @return @em true if the value was removed the queue, @em false if a
     *  	   timeout occurred.
     */
    template <typename Rep, class Period>
    bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
        if (!val)
            return false;

        unique_guard g{lock_};
        notEmptyCond_.wait_for(g, relTime, [this] { return !que_.empty() || closed_; });

        if (que_.empty())
            return false;

        *val = std::move(que_.front());
        que_.pop();
        notFullCond_.notify_one();
        return true;
    }
    /**
     * Attempt to remove an item from the queue for a bounded amount of time.
     * This will retrieve the next item from the queue. If the queue is
     * empty, it will wait until the specified time for an item to arrive
     * before timing out.
     * @param val Pointer to a variable to receive the value.
     * @param absTime The absolute time to wait to before timing out.
     * @return @em true if the value was removed from the queue, @em false
     *  	   if a timeout occurred.
     */
    template <class Clock, class Duration>
    bool try_get_until(
        value_type* val, const std::chrono::time_point<Clock, Duration>& absTime
    ) {
        if (!val)
            return false;

        unique_guard g{lock_};
        notEmptyCond_.wait_until(g, absTime, [this] { return !que_.empty() || closed_; });
        if (que_.empty())
            return false;

        *val = std::move(que_.front());
        que_.pop();
        notFullCond_.notify_one();
        return true;
    }
};

/////////////////////////////////////////////////////////////////////////////
}  // namespace mqtt

#endif  // __mqtt_thread_queue_h