File: channelimpl.h

package info (click to toggle)
amqp-cpp 4.3.27-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,384 kB
  • sloc: cpp: 10,021; ansic: 191; makefile: 95
file content (803 lines) | stat: -rw-r--r-- 24,870 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
/**
 *  ChannelImpl.h
 *
 *  Extended channel object that is used internally by the library, but
 *  that has a private constructor so that it can not be used from outside
 *  the AMQP library
 *
 *  @copyright 2014 - 2023 Copernica BV
 */

/**
 *  Include guard
 */
#pragma once

/**
 *  Dependencies
 */
#include "exchangetype.h"
#include "watchable.h"
#include "callbacks.h"
#include "copiedbuffer.h"
#include "deferred.h"
#include "monitor.h"
#include <memory>
#include <queue>
#include <map>

/**
 *  Set up namespace
 */
namespace AMQP {

/**
 *  Forward declarations
 */
class DeferredReceiver;
class BasicDeliverFrame;
class DeferredConsumer;
class BasicGetOKFrame;
class ConsumedMessage;
class ConnectionImpl;
class DeferredDelete;
class DeferredCancel;
class DeferredConfirm;
class DeferredQueue;
class DeferredGet;
class DeferredRecall;
class Connection;
class Envelope;
class Table;
class Frame;

/**
 *  Class definition
 */
class ChannelImpl : public Watchable, public std::enable_shared_from_this<ChannelImpl>
{
private:
    /**
     *  Pointer to the connection
     *  @var    ConnectionImpl
     */
    ConnectionImpl *_connection = nullptr;

    /**
     *  Callback when the channel is ready
     *  @var    SuccessCallback
     */
    SuccessCallback _readyCallback;

    /**
     *  Callback when the channel errors out
     *  @var    ErrorCallback
     */
    ErrorCallback _errorCallback;

    /**
     *  Handler that deals with incoming messages as a result of publish operations
     *  @var    DeferredRecall
     */
    std::shared_ptr<DeferredRecall> _recall;

    /**
     *  Handler that deals with publisher confirms frames
     *  @var    std::shared_ptr<DeferredConfirm>
     */
    std::shared_ptr<DeferredConfirm> _confirm;

    /**
     *  Handlers for all consumers that are active
     *  @var    std::map<std::string,std::shared_ptr<DeferredConsumer>
     */
    std::map<std::string,std::shared_ptr<DeferredConsumer>> _consumers;

    /**
     *  Pointer to the oldest deferred result (the first one that is going
     *  to be executed)
     *
     *  @var    Deferred
     */
    std::shared_ptr<Deferred> _oldestCallback;

    /**
     *  Pointer to the newest deferred result (the last one to be added).
     *
     *  @var    Deferred
     */
    std::shared_ptr<Deferred> _newestCallback;

    /**
     *  The channel number
     *  @var uint16_t
     */
    uint16_t _id = 0;

    /**
     *  State of the channel object
     *  @var enum
     */
    enum {
        state_connected,
        state_ready,
        state_closing,
        state_closed
    } _state = state_closed;

    /**
     *  The frames that still need to be send out
     *
     *  We store the data as well as whether they
     *  should be handled synchronously.
     *
     *  @var std::queue
     */
    std::queue<CopiedBuffer> _queue;

    /**
     *  Are we currently operating in synchronous mode? Meaning: do we first have
     *  to wait for the answer to previous instructions before we send a new instruction?
     *  @var bool
     */
    bool _synchronous = false;

    /**
     *  The current object that is busy receiving a message
     *  @var std::shared_ptr<DeferredReceiver>
     */
    std::shared_ptr<DeferredReceiver> _receiver;

    /**
     *  Attach the connection
     *  @param  connection
     *  @return bool
     */
    bool attach(Connection *connection);

    /**
     *  Push a deferred result
     *  @param  result          The deferred result
     *  @return Deferred        The object just pushed
     */
    Deferred &push(const std::shared_ptr<Deferred> &deferred);

    /**
     *  Send a framen and push a deferred result
     *  @param  frame           The frame to send
     *  @return Deferred        The object just pushed
     */
    Deferred &push(const Frame &frame);

protected:
    /**
     *  Construct a channel object
     *
     *  Note that the constructor is private, and that the Channel class is
     *  a friend. By doing this we ensure that nobody can instantiate this
     *  object, and that it can thus only be used inside the library.
     */
    ChannelImpl();

public:
    /**
     *  Copy'ing of channel objects is not supported
     *  @param  channel
     */
    ChannelImpl(const ChannelImpl &channel) = delete;

    /**
     *  Destructor
     */
    virtual ~ChannelImpl();

    /**
     *  No assignments of other channels
     *  @param  channel
     *  @return Channel
     */
    ChannelImpl &operator=(const ChannelImpl &channel) = delete;

    /**
     *  Invalidate the channel
     *  This method is called when the connection is destructed
     */
    void detach()
    {
        // connection is gone
        _connection = nullptr;
    }
    
    /**
     *  Expose the currently installed callbacks
     *  @return ErrorCallback
     */
    const ErrorCallback &onError() const { return _errorCallback; }
    const SuccessCallback &onReady() const { return _readyCallback; }
    

    /**
     *  Callback that is called when the channel was succesfully created.
     *  @param  callback    the callback to execute
     */
    inline void onReady(const SuccessCallback& callback) { return onReady(SuccessCallback(callback)); }
    void onReady(SuccessCallback&& callback)
    {
        // store callback
        _readyCallback = std::move(callback);

        // direct call if channel is already ready
        if (_state == state_ready && _readyCallback) _readyCallback();
    }

    /**
     *  Callback that is called when an error occurs.
     *
     *  Only one error callback can be registered. Calling this function
     *  multiple times will remove the old callback.
     *
     *  @param  callback    the callback to execute
     */
    inline void onError(const ErrorCallback& callback) { return onError(ErrorCallback(callback)); }
    void onError(ErrorCallback&& callback);

    /**
     *  Pause deliveries on a channel
     *
     *  This will stop all incoming messages
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &pause();

    /**
     *  Resume a paused channel
     *
     *  This will resume incoming messages
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &resume();

    /**
     *  Is the channel usable / not yet closed?
     *  @return bool
     */
    bool usable() const
    {
        return _state == state_connected || _state == state_ready;
    }

    /**
     *  Is the channel ready / has it passed the initial handshake?
     *  @return bool
     */
    bool ready() const
    {
        return _state == state_ready;
    }

    /**
     *  Put channel in a confirm mode (RabbitMQ specific)
     */
    DeferredConfirm &confirmSelect();

    /**
     *  Start a transaction
     */
    Deferred &startTransaction();

    /**
     *  Commit the current transaction
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &commitTransaction();

    /**
     *  Rollback the current transaction
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &rollbackTransaction();

    /**
     *  declare an exchange
     *
     *  @param  name        name of the exchange to declare
     *  @param  type        type of exchange
     *  @param  flags       additional settings for the exchange
     *  @param  arguments   additional arguments
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &declareExchange(const std::string_view &name, ExchangeType type, int flags, const Table &arguments);

    /**
     *  bind two exchanges

     *  @param  source      exchange which binds to target
     *  @param  target      exchange to bind to
     *  @param  routingKey  routing key
     *  @param  arguments   additional arguments for binding
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &bindExchange(const std::string_view &source, const std::string_view &target, const std::string_view &routingkey, const Table &arguments);

    /**
     *  unbind two exchanges

     *  @param  source      the source exchange
     *  @param  target      the target exchange
     *  @param  routingkey  the routing key
     *  @param  arguments   additional unbind arguments
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &unbindExchange(const std::string_view &source, const std::string_view &target, const std::string_view &routingkey, const Table &arguments);

    /**
     *  remove an exchange
     *
     *  @param  name        name of the exchange to remove
     *  @param  flags       additional settings for deleting the exchange
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &removeExchange(const std::string_view &name, int flags);

    /**
     *  declare a queue
     *  @param  name        queue name
     *  @param  flags       additional settings for the queue
     *  @param  arguments   additional arguments
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    DeferredQueue &declareQueue(const std::string_view &name, int flags, const Table &arguments);

    /**
     *  Bind a queue to an exchange
     *
     *  @param  exchangeName    name of the exchange to bind to
     *  @param  queueName       name of the queue
     *  @param  routingkey      routingkey
     *  @param  arguments       additional arguments
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &bindQueue(const std::string_view &exchangeName, const std::string_view &queueName, const std::string_view &routingkey, const Table &arguments);

    /**
     *  Unbind a queue from an exchange
     *
     *  @param  exchange    the source exchange
     *  @param  queue       the target queue
     *  @param  routingkey  the routing key
     *  @param  arguments   additional bind arguments
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &unbindQueue(const std::string_view &exchangeName, const std::string_view &queueName, const std::string_view &routingkey, const Table &arguments);

    /**
     *  Purge a queue
     *  @param  queue       queue to purge
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     *
     *  The onSuccess() callback that you can install should have the following signature:
     *
     *      void myCallback(AMQP::Channel *channel, uint32_t messageCount);
     *
     *  For example: channel.purgeQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
     *
     *      std::cout << "Queue purged, all " << messageCount << " messages removed" << std::endl;
     *
     *  });
     */
    DeferredDelete &purgeQueue(const std::string_view &name);

    /**
     *  Remove a queue
     *  @param  queue       queue to remove
     *  @param  flags       additional flags
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     *
     *  The onSuccess() callback that you can install should have the following signature:
     *
     *      void myCallback(AMQP::Channel *channel, uint32_t messageCount);
     *
     *  For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, uint32_t messageCount) {
     *
     *      std::cout << "Queue deleted, along with " << messageCount << " messages" << std::endl;
     *
     *  });
     */
    DeferredDelete &removeQueue(const std::string_view &name, int flags);

    /**
     *  Publish a message to an exchange
     *
     *  If the mandatory or immediate flag is set, and the message could not immediately
     *  be published, the message will be returned to the client.
     *
     *  @param  exchange    the exchange to publish to
     *  @param  routingkey  the routing key
     *  @param  envelope    the full envelope to send
     *  @param  message     the message to send
     *  @param  size        size of the message
     *  @param  flags       optional flags
     *  @return bool
     */
    bool publish(const std::string_view &exchange, const std::string_view &routingKey, const Envelope &envelope, int flags);

    /**
     *  Set the Quality of Service (QOS) of the entire connection
     *  @param  prefetchCount       maximum number of messages to prefetch
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     *
     *  @param  count       number of messages to pre-fetch
     *  @param  global      share count between all consumers on the same channel
     */
    Deferred &setQos(uint16_t prefetchCount, bool global = false);

    /**
     *  Tell the RabbitMQ server that we're ready to consume messages
     *  @param  queue               the queue from which you want to consume
     *  @param  tag                 a consumer tag that will be associated with this consume operation
     *  @param  flags               additional flags
     *  @param  arguments           additional arguments
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     *
     *  The onSuccess() callback that you can install should have the following signature:
     *
     *      void myCallback(AMQP::Channel *channel, const std::string& tag);
     *
     *  For example: channel.declareQueue("myqueue").onSuccess([](AMQP::Channel *channel, const std::string& tag) {
     *
     *      std::cout << "Started consuming under tag " << tag << std::endl;
     *
     *  });
     */
    DeferredConsumer& consume(const std::string_view &queue, const std::string_view &tag, int flags, const Table &arguments);

    /**
     *  Tell that you are prepared to recall/take back messages that could not be
     *  published. This is only meaningful if you pass the 'immediate' or 'mandatory'
     *  flag to publish() operations.
     * 
     *  THis function returns a deferred handler more or less similar to the object
     *  return by the consume() method and that can be used to install callbacks that
     *  handle the recalled messages.
     */
    DeferredRecall &recall();

    /**
     *  Cancel a running consumer
     *  @param  tag                 the consumer tag
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     *
     *  The onSuccess() callback that you can install should have the following signature:
     *
     *      void myCallback(const std::string& tag);
     *
     *  For example: channel.declareQueue("myqueue").onSuccess([](const std::string& tag) {
     *
     *      std::cout << "Started consuming under tag " << tag << std::endl;
     *
     *  });
     */
    DeferredCancel &cancel(const std::string_view &tag);

    /**
     *  Retrieve a single message from RabbitMQ
     *
     *  When you call this method, you can get one single message from the queue (or none
     *  at all if the queue is empty). The deferred object that is returned, should be used
     *  to install a onEmpty() and onSuccess() callback function that will be called
     *  when the message is consumed and/or when the message could not be consumed.
     *
     *  The following flags are supported:
     *
     *      -   noack               if set, consumed messages do not have to be acked, this happens automatically
     *
     *  @param  queue               name of the queue to consume from
     *  @param  flags               optional flags
     *
     *  The object returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onEmpty(), onError() and onFinalize() methods.
     *
     *  The onSuccess() callback has the following signature:
     *
     *      void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered);
     *
     *  For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) {
     *
     *      std::cout << "Message fetched" << std::endl;
     *
     *  }).onEmpty([]() {
     *
     *      std::cout << "Queue is empty" << std::endl;
     *
     *  });
     */
    DeferredGet &get(const std::string_view &queue, int flags = 0);

    /**
     *  Acknowledge a message
     *  @param  deliveryTag         the delivery tag
     *  @param  flags               optional flags
     *  @return bool
     */
    bool ack(uint64_t deliveryTag, int flags);

    /**
     *  Reject a message
     *  @param  deliveryTag         the delivery tag
     *  @param  flags               optional flags
     *  @return bool
     */
    bool reject(uint64_t deliveryTag, int flags);
    
    /**
     *  Recover messages that were not yet ack'ed
     *  @param  flags               optional flags
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &recover(int flags);

    /**
     *  Close the current channel
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &close();

    /**
     *  Get the channel we're working on
     *  @return uint16_t
     */
    uint16_t id() const
    {
        return _id;
    }

    /**
     *  Send a frame over the channel
     *  @param  frame       frame to send
     *  @return bool        was frame succesfully sent?
     */
    bool send(CopiedBuffer &&frame);

    /**
     *  Send a frame over the channel
     *  @param  frame       frame to send
     *  @return bool        was frame succesfully sent?
     */
    bool send(const Frame &frame);

    /**
     *  Is this channel waiting for an answer before it can send furher instructions
     *  @return bool
     */
    bool waiting() const
    {
        return _synchronous || !_queue.empty();
    }

    /**
     *  The max payload size for frames
     *  @return uint32_t
     */
    uint32_t maxPayload() const;

    /**
     *  Signal the channel that a synchronous operation was completed, and that any
     *  queued frames can be sent out.
     *  @return false if an error on the connection level occurred, true if not
     */
    bool flush();

    /**
     *  Report to the handler that the channel is opened
     */
    void reportReady()
    {
        // if we are still in connected state we are now ready
        if (_state == state_connected) _state = state_ready;
        
        // send out more instructions if there is a queue
        flush();

        // inform handler
        if (_readyCallback) _readyCallback();
    }

    /**
     *  Report to the handler that the channel is closed
     *
     *  Returns whether the channel object is still valid
     *
     *  @return bool
     */
    bool reportClosed()
    {
        // change state
        _state = state_closed;

        // create a monitor, because the callbacks could destruct the current object
        Monitor monitor(this);

        // and pass on to the reportSuccess() method which will call the
        // appropriate deferred object to report the successful operation
        bool result = reportSuccess();

        // leap out if object no longer exists
        if (!monitor.valid()) return result;

        // all later deferred objects should report an error, because it
        // was not possible to complete the instruction as the channel is
        // now closed (but the channel onError does not have to run)
        reportError("Channel has been closed", false);

        // done
        return result;
    }

    /**
     *  Report success
     *
     *  Returns whether the channel object is still valid
     *
     *  @param  mixed
     *  @return bool
     */
    template <typename... Arguments>
    bool reportSuccess(Arguments ...parameters)
    {
        // skip if there is no oldest callback
        if (!_oldestCallback) return true;

        // we are going to call callbacks that could destruct the channel
        Monitor monitor(this);

        // flush the queue, which will send the next operation if the current operation was synchronous
        flush();

        // the call to flush may have resulted in a call to reportError
        if (!monitor.valid()) return false;

        // copy the callback (so that it will not be destructed during
        // the "reportSuccess" call, if the channel is destructed during the call)
        auto cb = _oldestCallback;

        // the call to flush might have caused the callback to have been invoked; check once more
        if (!cb) return true;

        // call the callback
        auto next = cb->reportSuccess(std::forward<Arguments>(parameters)...);

        // leap out if channel no longer exist
        if (!monitor.valid()) return false;
        
        // in case the callback-shared-pointer is still kept in scope (for example because it
        // is stored in the list of consumers), we do want to ensure that it no longer maintains
        // a chain of queued deferred objects
        cb->unchain();

        // set the oldest callback
        _oldestCallback = next;

        // if there was no next callback, the newest callback was just used
        if (!next) _newestCallback = nullptr;

        // we are still valid
        return true;
    }

    /**
     *  Report that a consumer was cancelled by the server (for example because the 
     *  queue was removed or the node on which the queue was stored was terminated)
     *  @param  tag                 the consumer tag
     */
    void reportCancelled(const std::string &tag);

    /**
     *  Report an error message on a channel
     *  @param  message             the error message
     *  @param  notifyhandler       should the channel-wide handler also be called?
     */
    void reportError(const char *message, bool notifyhandler = true);

    /**
     *  Install a consumer
     *  @param  consumertag     The consumer tag
     *  @param  consumer        The consumer object
     */
    void install(const std::string &consumertag, const std::shared_ptr<DeferredConsumer> &consumer)
    {
        // install the consumer handler
        _consumers[consumertag] = consumer;
    }

    /**
     *  Install the current consumer
     *  @param  receiver        The receiver object
     */
    void install(const std::shared_ptr<DeferredReceiver> &receiver)
    {
        // store object as current receiver
        _receiver = receiver;
    }

    /**
     *  Uninstall a consumer callback
     *  @param  consumertag     The consumer tag
     */
    void uninstall(const std::string &consumertag)
    {
        // erase the callback
        _consumers.erase(consumertag);
    }

    /**
     *  Fetch the receiver for a specific consumer tag
     *  @param  consumertag the consumer tag
     *  @return             the receiver object
     */
    DeferredConsumer *consumer(const std::string &consumertag) const;

    /**
     *  Retrieve the current object that is receiving a message
     *  @return The handler responsible for the current message
     */
    DeferredReceiver *receiver() const { return _receiver.get(); }
    
    /**
     *  Retrieve the recalls-object that handles bounces
     *  @return The deferred recall object
     */
    DeferredRecall *recalls() const { return _recall.get(); }

    /**
     *  Retrieve the deferred confirm that handles publisher confirms
     *  @return The deferred confirm object
     */
    DeferredConfirm *confirm() const { return _confirm.get(); }

    /**
     *  The channel class is its friend, thus can it instantiate this object
     */
    friend class Channel;
};

/**
 *  End of namespace
 */
}