File: mspriority_queue.h

package info (click to toggle)
libcds 2.3.3-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 15,632 kB
  • sloc: cpp: 135,002; ansic: 7,234; perl: 243; sh: 237; makefile: 6
file content (518 lines) | stat: -rw-r--r-- 19,429 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
// Copyright (c) 2006-2018 Maxim Khizhinsky
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)

#ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H
#define CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H

#include <mutex>  // std::unique_lock
#include <cds/intrusive/details/base.h>
#include <cds/sync/spinlock.h>
#include <cds/os/thread.h>
#include <cds/details/bit_reverse_counter.h>
#include <cds/intrusive/options.h>
#include <cds/opt/buffer.h>
#include <cds/opt/compare.h>
#include <cds/details/bounded_container.h>

namespace cds { namespace intrusive {

    /// MSPriorityQueue related definitions
    /** @ingroup cds_intrusive_helper
    */
    namespace mspriority_queue {

        /// MSPriorityQueue statistics
        template <typename Counter = cds::atomicity::event_counter>
        struct stat {
            typedef Counter   event_counter ; ///< Event counter type

            event_counter   m_nPushCount;            ///< Count of success push operation
            event_counter   m_nPopCount;             ///< Count of success pop operation
            event_counter   m_nPushFailCount;        ///< Count of failed ("the queue is full") push operation
            event_counter   m_nPopFailCount;         ///< Count of failed ("the queue is empty") pop operation
            event_counter   m_nPushHeapifySwapCount; ///< Count of item swapping when heapifying in push
            event_counter   m_nPopHeapifySwapCount;  ///< Count of item swapping when heapifying in pop
            event_counter   m_nItemMovedTop;         ///< Count of events when \p push() encountered that inserted item was moved to top by a concurrent \p pop()
            event_counter   m_nItemMovedUp;          ///< Count of events when \p push() encountered that inserted item was moved upwards by a concurrent \p pop()
            event_counter   m_nPushEmptyPass;        ///< Count of empty pass during heapify via concurrent operations

            //@cond
            void onPushSuccess()            { ++m_nPushCount            ;}
            void onPopSuccess()             { ++m_nPopCount             ;}
            void onPushFailed()             { ++m_nPushFailCount        ;}
            void onPopFailed()              { ++m_nPopFailCount         ;}
            void onPushHeapifySwap()        { ++m_nPushHeapifySwapCount ;}
            void onPopHeapifySwap()         { ++m_nPopHeapifySwapCount  ;}

            void onItemMovedTop()           { ++m_nItemMovedTop         ;}
            void onItemMovedUp()            { ++m_nItemMovedUp          ;}
            void onPushEmptyPass()          { ++m_nPushEmptyPass        ;}
            //@endcond
        };

        /// MSPriorityQueue empty statistics
        struct empty_stat {
            //@cond
            void onPushSuccess()            const {}
            void onPopSuccess()             const {}
            void onPushFailed()             const {}
            void onPopFailed()              const {}
            void onPushHeapifySwap()        const {}
            void onPopHeapifySwap()         const {}

            void onItemMovedTop()           const {}
            void onItemMovedUp()            const {}
            void onPushEmptyPass()          const {}
            //@endcond
        };

        /// MSPriorityQueue traits
        struct traits {
            /// Storage type
            /**
                The storage type for the heap array. Default is \p cds::opt::v::initialized_dynamic_buffer.

                You may specify any type of buffer's value since at instantiation time
                the \p buffer::rebind member metafunction is called to change type
                of values stored in the buffer.
            */
            typedef opt::v::initialized_dynamic_buffer<void *>  buffer;

            /// Priority compare functor
            /**
                No default functor is provided. If the option is not specified, the \p less is used.
            */
            typedef opt::none       compare;

            /// Specifies binary predicate used for priority comparing.
            /**
                Default is \p std::less<T>.
            */
            typedef opt::none       less;

            /// Type of mutual-exclusion lock. The lock is not need to be recursive.
            typedef cds::sync::spin lock_type;

            /// Back-off strategy
            typedef backoff::Default    back_off;

            /// Internal statistics
            /**
                Possible types: \p mspriority_queue::empty_stat (the default, no overhead), \p mspriority_queue::stat
                or any other with interface like \p %mspriority_queue::stat
            */
            typedef empty_stat      stat;
        };

        /// Metafunction converting option list to traits
        /**
            \p Options:
            - \p opt::buffer - the buffer type for heap array. Possible type are: \p opt::v::initialized_static_buffer, \p opt::v::initialized_dynamic_buffer.
                Default is \p %opt::v::initialized_dynamic_buffer.
                You may specify any type of value for the buffer since at instantiation time
                the \p buffer::rebind member metafunction is called to change the type of values stored in the buffer.
            - \p opt::compare - priority compare functor. No default functor is provided.
                If the option is not specified, the \p opt::less is used.
            - \p opt::less - specifies binary predicate used for priority compare. Default is \p std::less<T>.
            - \p opt::lock_type - lock type. Default is \p cds::sync::spin
            - \p opt::back_off - back-off strategy. Default is \p cds::backoff::yield
            - \p opt::stat - internal statistics. Available types: \p mspriority_queue::stat, \p mspriority_queue::empty_stat (the default, no overhead)
        */
        template <typename... Options>
        struct make_traits {
#   ifdef CDS_DOXYGEN_INVOKED
            typedef implementation_defined type ;   ///< Metafunction result
#   else
            typedef typename cds::opt::make_options<
                typename cds::opt::find_type_traits< traits, Options... >::type
                ,Options...
            >::type   type;
#   endif
        };

    }   // namespace mspriority_queue

    /// Michael & Scott array-based lock-based concurrent priority queue heap
    /** @ingroup cds_intrusive_priority_queue
        Source:
            - [1996] G.Hunt, M.Michael, S. Parthasarathy, M.Scott
                "An efficient algorithm for concurrent priority queue heaps"

        \p %MSPriorityQueue augments the standard array-based heap data structure with
        a mutual-exclusion lock on the heap's size and locks on each node in the heap.
        Each node also has a tag that indicates whether
        it is empty, valid, or in a transient state due to an update to the heap
        by an inserting thread.
        The algorithm allows concurrent insertions and deletions in opposite directions,
        without risking deadlock and without the need for special server threads.
        It also uses a "bit-reversal" technique to scatter accesses across the fringe
        of the tree to reduce contention.
        On large heaps the algorithm achieves significant performance improvements
        over serialized single-lock algorithm, for various insertion/deletion
        workloads. For small heaps it still performs well, but not as well as
        single-lock algorithm.

        Template parameters:
        - \p T - type to be stored in the queue. The priority is a part of \p T type.
        - \p Traits - type traits. See \p mspriority_queue::traits for explanation.
            It is possible to declare option-based queue with \p cds::container::mspriority_queue::make_traits
            metafunction instead of \p Traits template argument.
    */
    template <typename T, class Traits = mspriority_queue::traits >
    class MSPriorityQueue: public cds::bounded_container
    {
    public:
        typedef T           value_type  ;   ///< Value type stored in the queue
        typedef Traits      traits      ;   ///< Traits template parameter

#   ifdef CDS_DOXYGEN_INVOKED
        typedef implementation_defined key_comparator  ;    ///< priority comparing functor based on opt::compare and opt::less option setter.
#   else
        typedef typename opt::details::make_comparator< value_type, traits >::type key_comparator;
#   endif

        typedef typename traits::lock_type      lock_type;   ///< heap's size lock type
        typedef typename traits::back_off       back_off;    ///< Back-off strategy
        typedef typename traits::stat           stat;        ///< internal statistics type, see \p mspriority_queue::traits::stat
        typedef typename cds::bitop::bit_reverse_counter<> item_counter;///< Item counter type

    protected:
        //@cond
        typedef cds::OS::ThreadId   tag_type;

        enum tag_value {
            Available   = -1,
            Empty       = 0
        };
        //@endcond

        //@cond
        /// Heap item type
        struct node {
            value_type *        m_pVal  ;   ///< A value pointer
            tag_type volatile   m_nTag  ;   ///< A tag
            mutable lock_type   m_Lock  ;   ///< Node-level lock

            /// Creates empty node
            node()
                : m_pVal( nullptr )
                , m_nTag( tag_type(Empty))
            {}

            /// Lock the node
            void lock()
            {
                m_Lock.lock();
            }

            /// Unlock the node
            void unlock()
            {
                m_Lock.unlock();
            }
        };
        //@endcond

    public:
        typedef typename traits::buffer::template rebind<node>::other   buffer_type ;   ///< Heap array buffer type

        //@cond
        typedef typename item_counter::counter_type    counter_type;
        //@endcond

    protected:
        item_counter        m_ItemCounter   ;   ///< Item counter
        mutable lock_type   m_Lock          ;   ///< Heap's size lock
        buffer_type         m_Heap          ;   ///< Heap array
        stat                m_Stat          ;   ///< internal statistics accumulator

    public:
        /// Constructs empty priority queue
        /**
            For \p cds::opt::v::initialized_static_buffer the \p nCapacity parameter is ignored.
        */
        MSPriorityQueue( size_t nCapacity )
            : m_Heap( nCapacity )
        {}

        /// Clears priority queue and destructs the object
        ~MSPriorityQueue()
        {
            clear();
        }

        /// Inserts a item into priority queue
        /**
            If the priority queue is full, the function returns \p false,
            no item has been added.
            Otherwise, the function inserts the pointer to \p val into the heap
            and returns \p true.

            The function does not make a copy of \p val.
        */
        bool push( value_type& val )
        {
            tag_type const curId = cds::OS::get_current_thread_id();

            // Insert new item at bottom of the heap
            m_Lock.lock();
            if ( m_ItemCounter.value() >= capacity()) {
                // the heap is full
                m_Lock.unlock();
                m_Stat.onPushFailed();
                return false;
            }

            counter_type i = m_ItemCounter.inc();
            assert( i < m_Heap.capacity());

            node& refNode = m_Heap[i];
            refNode.lock();
            m_Lock.unlock();
            assert( refNode.m_nTag == tag_type( Empty ));
            assert( refNode.m_pVal == nullptr );
            refNode.m_pVal = &val;
            refNode.m_nTag = curId;
            refNode.unlock();

            // Move item towards top of heap while it has a higher priority than its parent
            heapify_after_push( i, curId );

            m_Stat.onPushSuccess();
            return true;
        }

        /// Extracts item with high priority
        /**
            If the priority queue is empty, the function returns \p nullptr.
            Otherwise, it returns the item extracted.
        */
        value_type * pop()
        {
            node& refTop = m_Heap[1];

            m_Lock.lock();
            if ( m_ItemCounter.value() == 0 ) {
                // the heap is empty
                m_Lock.unlock();
                m_Stat.onPopFailed();
                return nullptr;
            }
            counter_type nBottom = m_ItemCounter.dec();
            assert( nBottom < m_Heap.capacity());
            assert( nBottom > 0 );

            refTop.lock();
            if ( nBottom == 1 ) {
                refTop.m_nTag = tag_type( Empty );
                value_type * pVal = refTop.m_pVal;
                refTop.m_pVal = nullptr;
                refTop.unlock();
                m_Lock.unlock();
                m_Stat.onPopSuccess();
                return pVal;
            }

            node& refBottom = m_Heap[nBottom];
            refBottom.lock();
            m_Lock.unlock();
            refBottom.m_nTag = tag_type(Empty);
            value_type * pVal = refBottom.m_pVal;
            refBottom.m_pVal = nullptr;
            refBottom.unlock();

            if ( refTop.m_nTag == tag_type(Empty)) {
                // nBottom == nTop
                refTop.unlock();
                m_Stat.onPopSuccess();
                return pVal;
            }

            std::swap( refTop.m_pVal, pVal );
            refTop.m_nTag = tag_type( Available );

            // refTop will be unlocked inside heapify_after_pop
            heapify_after_pop( &refTop );

            m_Stat.onPopSuccess();
            return pVal;
        }

        /// Clears the queue (not atomic)
        /**
            This function is no atomic, but thread-safe
        */
        void clear()
        {
            clear_with( []( value_type const& /*src*/ ) {} );
        }

        /// Clears the queue (not atomic)
        /**
            This function is no atomic, but thread-safe.

            For each item removed the functor \p f is called.
            \p Func interface is:
            \code
                struct clear_functor
                {
                    void operator()( value_type& item );
                };
            \endcode
            A lambda function or a function pointer can be used as \p f.
        */
        template <typename Func>
        void clear_with( Func f )
        {
            value_type * pVal;
            while (( pVal = pop()) != nullptr )
                f( *pVal );
        }

        /// Checks is the priority queue is empty
        bool empty() const
        {
            return size() == 0;
        }

        /// Checks if the priority queue is full
        bool full() const
        {
            return size() == capacity();
        }

        /// Returns current size of priority queue
        size_t size() const
        {
            std::unique_lock<lock_type> l( m_Lock );
            return static_cast<size_t>( m_ItemCounter.value());
        }

        /// Return capacity of the priority queue
        size_t capacity() const
        {
            // m_Heap[0] is not used
            return m_Heap.capacity() - 1;
        }

        /// Returns const reference to internal statistics
        stat const& statistics() const
        {
            return m_Stat;
        }

    protected:
        //@cond

        void heapify_after_push( counter_type i, tag_type curId )
        {
            key_comparator  cmp;
            back_off        bkoff;

            // Move item towards top of the heap while it has higher priority than parent
            while ( i > 1 ) {
                bool bProgress = true;
                counter_type nParent = i / 2;
                node& refParent = m_Heap[nParent];
                refParent.lock();
                node& refItem = m_Heap[i];
                refItem.lock();

                if ( refParent.m_nTag == tag_type(Available) && refItem.m_nTag == curId ) {
                    if ( cmp( *refItem.m_pVal, *refParent.m_pVal ) > 0 ) {
                        std::swap( refItem.m_nTag, refParent.m_nTag );
                        std::swap( refItem.m_pVal, refParent.m_pVal );
                        m_Stat.onPushHeapifySwap();
                        i = nParent;
                    }
                    else {
                        refItem.m_nTag = tag_type(Available);
                        i = 0;
                    }
                }
                else if ( refParent.m_nTag == tag_type( Empty )) {
                    m_Stat.onItemMovedTop();
                    i = 0;
                }
                else if ( refItem.m_nTag != curId ) {
                    m_Stat.onItemMovedUp();
                    i = nParent;
                }
                else {
                    m_Stat.onPushEmptyPass();
                    bProgress = false;
                }

                refItem.unlock();
                refParent.unlock();

                if ( !bProgress )
                    bkoff();
                else
                    bkoff.reset();
            }

            if ( i == 1 ) {
                node& refItem = m_Heap[i];
                refItem.lock();
                if ( refItem.m_nTag == curId )
                    refItem.m_nTag = tag_type(Available);
                refItem.unlock();
            }
        }

        void heapify_after_pop( node * pParent )
        {
            key_comparator cmp;
            counter_type const nCapacity = m_Heap.capacity();

            counter_type nParent = 1;
            for ( counter_type nChild = nParent * 2; nChild < nCapacity; nChild *= 2 ) {
                node* pChild = &m_Heap[ nChild ];
                pChild->lock();

                if ( pChild->m_nTag == tag_type( Empty )) {
                    pChild->unlock();
                    break;
                }

                counter_type const nRight = nChild + 1;
                if ( nRight < nCapacity ) {
                    node& refRight = m_Heap[nRight];
                    refRight.lock();

                    if ( refRight.m_nTag != tag_type( Empty ) && cmp( *refRight.m_pVal, *pChild->m_pVal ) > 0 ) {
                        // get right child
                        pChild->unlock();
                        nChild = nRight;
                        pChild = &refRight;
                    }
                    else
                        refRight.unlock();
                }

                // If child has higher priority than parent then swap
                // Otherwise stop
                if ( cmp( *pChild->m_pVal, *pParent->m_pVal ) > 0 ) {
                    std::swap( pParent->m_nTag, pChild->m_nTag );
                    std::swap( pParent->m_pVal, pChild->m_pVal );
                    pParent->unlock();
                    m_Stat.onPopHeapifySwap();
                    nParent = nChild;
                    pParent = pChild;
                }
                else {
                    pChild->unlock();
                    break;
                }
            }
            pParent->unlock();
        }
        //@endcond
    };

}} // namespace cds::intrusive

#endif // #ifndef CDSLIB_INTRUSIVE_MSPRIORITY_QUEUE_H