File: rwqueue.h

package info (click to toggle)
libcds 2.3.3-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 15,632 kB
  • sloc: cpp: 135,002; ansic: 7,234; perl: 243; sh: 237; makefile: 6
file content (394 lines) | stat: -rw-r--r-- 13,167 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
// Copyright (c) 2006-2018 Maxim Khizhinsky
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)

#ifndef CDSLIB_CONTAINER_RWQUEUE_H
#define CDSLIB_CONTAINER_RWQUEUE_H

#include <cds/sync/spinlock.h>
#include <cds/opt/options.h>
#include <cds/details/allocator.h>
#include <mutex>        // unique_lock
#include <memory>

namespace cds { namespace container {
    /// RWQueue related definitions
    /** @ingroup cds_nonintrusive_helper
    */
    namespace rwqueue {
        /// RWQueue default type traits
        struct traits
        {
            /// Lock policy
            typedef cds::sync::spin  lock_type;

            /// Node allocator
            typedef CDS_DEFAULT_ALLOCATOR   allocator;

            /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
            typedef cds::atomicity::empty_item_counter item_counter;

            /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
            enum { padding = opt::cache_line_padding };
        };

        /// Metafunction converting option list to \p rwqueue::traits
        /**
            Supported \p Options are:
            - opt::lock_type - lock policy, default is \p cds::sync::spin. Any type satisfied \p Mutex C++ concept may be used.
            - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR
            - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
                To enable item counting use \p cds::atomicity::item_counter.
            - \p opt::padding - padding for internal critical data. Default is \p opt::cache_line_padding

            Example: declare mutex-based \p %RWQueue with item counting
            \code
            typedef cds::container::RWQueue< Foo,
                typename cds::container::rwqueue::make_traits<
                    cds::opt::item_counter< cds::atomicity::item_counter >,
                    cds::opt::lock_type< std::mutex >
                >::type
            > myQueue;
            \endcode
        */
        template <typename... Options>
        struct make_traits {
#   ifdef CDS_DOXYGEN_INVOKED
            typedef implementation_defined type;   ///< Metafunction result
#   else
            typedef typename cds::opt::make_options<
                typename cds::opt::find_type_traits< traits, Options... >::type
                , Options...
            >::type type;
#   endif
        };

    } // namespace rwqueue

    /// Michael & Scott blocking queue with fine-grained synchronization schema
    /** @ingroup cds_nonintrusive_queue
        The queue has two different locks: one for reading and one for writing.
        Therefore, one writer and one reader can simultaneously access to the queue.
        The queue does not require any garbage collector.

        <b>Source</b>
            - [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking
                and blocking concurrent queue algorithms"

        <b>Template arguments</b>
        - \p T - value type to be stored in the queue
        - \p Traits - queue traits, default is \p rwqueue::traits. You can use \p rwqueue::make_traits
            metafunction to make your traits or just derive your traits from \p %rwqueue::traits:
            \code
            struct myTraits: public cds::container::rwqueue::traits {
                typedef cds::atomicity::item_counter    item_counter;
            };
            typedef cds::container::RWQueue< Foo, myTraits > myQueue;

            // Equivalent make_traits example:
            typedef cds::container::RWQueue< Foo,
                typename cds::container::rwqueue::make_traits<
                    cds::opt::item_counter< cds::atomicity::item_counter >
                >::type
            > myQueue;
            \endcode
    */
    template <typename T, typename Traits = rwqueue::traits >
    class RWQueue
    {
    public:
        /// Rebind template arguments
        template <typename T2, typename Traits2>
        struct rebind {
            typedef RWQueue< T2, Traits2 > other   ;   ///< Rebinding result
        };

    public:
        typedef T       value_type; ///< Type of value to be stored in the queue
        typedef Traits  traits;     ///< Queue traits

        typedef typename traits::lock_type    lock_type;    ///< Locking primitive
        typedef typename traits::item_counter item_counter; ///< Item counting policy used

    protected:
        //@cond
        /// Node type
        struct node_type
        {
            atomics::atomic< node_type *> m_pNext;  ///< Pointer to the next node in the queue
            value_type              m_value;        ///< Value stored in the node

            node_type( value_type const& v )
                : m_pNext( nullptr )
                , m_value(v)
            {}

            node_type()
                : m_pNext( nullptr )
            {}

            template <typename... Args>
            node_type( Args&&... args )
                : m_pNext( nullptr )
                , m_value( std::forward<Args>(args)...)
            {}
        };
        //@endcond

    public:
        /// Allocator type used for allocate/deallocate the queue nodes
        typedef typename std::allocator_traits<
            typename traits::allocator
        >::template rebind_alloc<node_type> allocator_type; 

    protected:
        //@cond
        typedef std::unique_lock<lock_type> scoped_lock;
        typedef cds::details::Allocator< node_type, allocator_type >  node_allocator;

        struct head_type {
            mutable lock_type lock;
            node_type *       ptr;
        };

        head_type m_Head;
        typename opt::details::apply_padding< head_type, traits::padding >::padding_type pad_;
        head_type m_Tail;

        item_counter    m_ItemCounter;
        //@endcond

    protected:
        //@cond
        static node_type * alloc_node()
        {
            return node_allocator().New();
        }

        static node_type * alloc_node( T const& data )
        {
            return node_allocator().New( data );
        }

        template <typename... Args>
        static node_type * alloc_node_move( Args&&... args )
        {
            return node_allocator().MoveNew( std::forward<Args>( args )... );
        }

        static void free_node( node_type * pNode )
        {
            node_allocator().Delete( pNode );
        }

        bool enqueue_node( node_type * p )
        {
            assert( p != nullptr );
            {
                scoped_lock lock( m_Tail.lock );
                m_Tail.ptr->m_pNext.store( p, atomics::memory_order_release );
                m_Tail.ptr = p;
            }
            ++m_ItemCounter;
            return true;
        }

        struct node_disposer {
            void operator()( node_type * pNode )
            {
                free_node( pNode );
            }
        };
        typedef std::unique_ptr< node_type, node_disposer >     scoped_node_ptr;
        //@endcond

    public:
        /// Makes empty queue
        RWQueue()
        {
            node_type * pNode = alloc_node();
            m_Head.ptr =
                m_Tail.ptr = pNode;
        }

        /// Destructor clears queue
        ~RWQueue()
        {
            clear();
            assert( m_Head.ptr == m_Tail.ptr );
            free_node( m_Head.ptr );
        }

        /// Enqueues \p data. Always return \a true
        bool enqueue( value_type const& data )
        {
            scoped_node_ptr p( alloc_node( data ));
            if ( enqueue_node( p.get())) {
                p.release();
                return true;
            }
            return false;
        }

        /// Enqueues \p data, move semantics
        bool enqueue( value_type&& data )
        {
            scoped_node_ptr p( alloc_node_move( std::move( data )));
            if ( enqueue_node( p.get())) {
                p.release();
                return true;
            }
            return false;
        }

        /// Enqueues \p data to the queue using a functor
        /**
            \p Func is a functor called to create node.
            The functor \p f takes one argument - a reference to a new node of type \ref value_type :
            \code
            cds::container::RWQueue< cds::gc::HP, Foo > myQueue;
            Bar bar;
            myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = bar; } );
            \endcode
        */
        template <typename Func>
        bool enqueue_with( Func f )
        {
            scoped_node_ptr p( alloc_node());
            f( p->m_value );
            if ( enqueue_node( p.get())) {
                p.release();
                return true;
            }
            return false;
        }

        /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
        template <typename... Args>
        bool emplace( Args&&... args )
        {
            scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
            if ( enqueue_node( p.get())) {
                p.release();
                return true;
            }
            return false;
        }

        /// Synonym for \p enqueue( value_type const& ) function
        bool push( value_type const& val )
        {
            return enqueue( val );
        }

        /// Synonym for \p enqueue( value_type&& ) function
        bool push( value_type&& val )
        {
            return enqueue( std::move( val ));
        }

        /// Synonym for \p enqueue_with() function
        template <typename Func>
        bool push_with( Func f )
        {
            return enqueue_with( f );
        }

        /// Dequeues a value to \p dest.
        /**
            If queue is empty returns \a false, \p dest can be corrupted.
            If queue is not empty returns \a true, \p dest contains the value dequeued
        */
        bool dequeue( value_type& dest )
        {
            return dequeue_with( [&dest]( value_type& src ) { dest = std::move( src ); });
        }

        /// Dequeues a value using a functor
        /**
            \p Func is a functor called to copy dequeued value.
            The functor takes one argument - a reference to removed node:
            \code
            cds:container::RWQueue< cds::gc::HP, Foo > myQueue;
            Bar bar;
            myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
            \endcode
            The functor is called only if the queue is not empty.
        */
        template <typename Func>
        bool dequeue_with( Func f )
        {
            node_type * pNode;
            {
                scoped_lock lock( m_Head.lock );
                pNode = m_Head.ptr;
                node_type * pNewHead = pNode->m_pNext.load( atomics::memory_order_acquire );
                if ( pNewHead == nullptr )
                    return false;
                f( pNewHead->m_value );
                m_Head.ptr = pNewHead;
            }    // unlock here
            --m_ItemCounter;
            free_node( pNode );
            return true;
        }

        /// Synonym for \p dequeue() function
        bool pop( value_type& dest )
        {
            return dequeue( dest );
        }

        /// Synonym for \p dequeue_with() function
        template <typename Func>
        bool pop_with( Func f )
        {
            return dequeue_with( f );
        }

        /// Checks if queue is empty
        bool empty() const
        {
            scoped_lock lock( m_Head.lock );
            return m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) == nullptr;
        }

        /// Clears queue
        void clear()
        {
            scoped_lock lockR( m_Head.lock );
            scoped_lock lockW( m_Tail.lock );
            while ( m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) != nullptr ) {
                node_type * pHead = m_Head.ptr;
                m_Head.ptr = m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed );
                free_node( pHead );
            }
            m_ItemCounter.reset();
        }

        /// Returns queue's item count
        /**
            The value returned depends on \p rwqueue::traits::item_counter. For \p atomicity::empty_item_counter,
            this function always returns 0.

            @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
            is empty. To check queue emptyness use \p empty() method.
        */
        size_t    size() const
        {
            return m_ItemCounter.value();
        }

        //@cond
        /// The class has no internal statistics. For test consistency only
        std::nullptr_t statistics() const
        {
            return nullptr;
        }
        //@endcond
    };

}}  // namespace cds::container

#endif // #ifndef CDSLIB_CONTAINER_RWQUEUE_H