File: channel.h

package info (click to toggle)
amqp-cpp 4.3.27-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,384 kB
  • sloc: cpp: 10,021; ansic: 191; makefile: 95
file content (616 lines) | stat: -rw-r--r-- 26,384 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
/**
 *  Class describing a (mid-level) AMQP channel implementation
 *
 *  @copyright 2014 - 2023 Copernica BV
 */

/**
 *  Include guard
 */
#pragma once

/**
 *  Set up namespace
 */
namespace AMQP {

/**
 *  Class definition
 */
class Channel
{
private:
    /**
     *  The implementation for the channel
     *  @var    std::shared_ptr<ChannelImpl>
     */
    std::shared_ptr<ChannelImpl> _implementation;

public:
    /**
     *  Construct a channel object
     * 
     *  The passed in connection pointer must remain valid for the 
     *  lifetime of the channel. Watch out: this method throws an error
     *  if the channel could not be constructed (for example because the
     *  max number of AMQP channels has been reached)
     * 
     *  @param  connection
     *  @throws std::runtime_error
     */
    Channel(Connection *connection);
    
    /**
     *  Copy'ing of channel objects is not supported
     *  @param  channel
     */
    Channel(const Channel &channel) = delete;

    /**
     *  But movement _is_ allowed
     *  @param  channel
     */
    Channel(Channel &&channel) : _implementation(std::move(channel._implementation)) {}

    /**
     *  Destructor
     */
    virtual ~Channel() 
    {
        // close the channel (this will eventually destruct the channel)
        // note that the channel may be in an invalid state in case it was moved, hence the "if"
        if (_implementation) _implementation->close();
    }

    /**
     *  No assignments of other channels
     *  @param  channel
     *  @return Channel
     */
    Channel &operator=(const Channel &channel) = delete;

    /**
     *  Callback that is called when the channel was succesfully created.
     *
     *  Only one callback can be registered. Calling this function multiple
     *  times will remove the old callback.
     *
     *  @param  callback    the callback to execute
     */         
    inline void onReady(const SuccessCallback& callback) { return onReady(SuccessCallback(callback)); }
    void onReady(SuccessCallback&& callback)
    {
        _implementation->onReady(std::move(callback));
    }

    /**
     *  Callback that is called when an error occurs.
     *
     *  Only one error callback can be registered. Calling this function
     *  multiple times will remove the old callback.
     *
     *  @param  callback    the callback to execute
     */
    inline void onError(const ErrorCallback& callback) { return onError(ErrorCallback(callback)); }
    void onError(ErrorCallback&& callback)
    {
        _implementation->onError(std::move(callback));
    }

    /**
     *  Pause deliveries on a channel
     *
     *  This will stop all incoming messages
     *
     *  Note that this function does *not* work using RabbitMQ. For more info
     *  @see https://www.rabbitmq.com/specification.html#method-status-channel.flow
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &pause()
    {
        return _implementation->pause();
    }

    /**
     *  Resume a paused channel
     *
     *  This will resume incoming messages
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &resume()
    {
        return _implementation->resume();
    }

    /**
     *  Is the channel ready / has it passed the initial handshake?
     *  @return bool
     */
    bool ready() const
    {
        return _implementation->ready();
    }

    /**
     *  Is the channel usable / not yet closed?
     *  @return bool
     */
    bool usable() const
    {
        return _implementation->usable();
    }
    
    /**
     *  Is the channel connected?
     *  This method is deprecated: use Channel::usable()
     *  @return bool
     *  @deprecated
     */
    bool connected() const
    {
        return usable();
    }

    /**
     *  Put channel in a confirm mode (RabbitMQ specific)
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    DeferredConfirm &confirmSelect()
    {
        return _implementation->confirmSelect();
    }

    /**
     *  Start a transaction
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &startTransaction()
    {
        return _implementation->startTransaction();
    }

    /**
     *  Commit the current transaction
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &commitTransaction()
    {
        return _implementation->commitTransaction();
    }

    /**
     *  Rollback the current transaction
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &rollbackTransaction()
    {
        return _implementation->rollbackTransaction();
    }

    /**
     *  Declare an exchange
     *
     *  If an empty name is supplied, a name will be assigned by the server.
     *
     *  The following flags can be used for the exchange:
     *
     *      -   durable     exchange survives a broker restart
     *      -   autodelete  exchange is automatically removed when all connected queues are removed
     *      -   passive     only check if the exchange exist
     *      -   internal    create an internal exchange
     *
     *  @param  name        name of the exchange
     *  @param  type        exchange type
     *  @param  flags       exchange flags
     *  @param  arguments   additional arguments
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &declareExchange(const std::string_view &name, ExchangeType type, int flags, const Table &arguments) { return _implementation->declareExchange(name, type, flags, arguments); }
    Deferred &declareExchange(const std::string_view &name, ExchangeType type, const Table &arguments) { return _implementation->declareExchange(name, type, 0, arguments); }
    Deferred &declareExchange(const std::string_view &name, ExchangeType type = fanout, int flags = 0) { return _implementation->declareExchange(name, type, flags, Table()); }
    Deferred &declareExchange(ExchangeType type, int flags, const Table &arguments) { return _implementation->declareExchange(std::string_view(), type, flags, arguments); }
    Deferred &declareExchange(ExchangeType type, const Table &arguments) { return _implementation->declareExchange(std::string_view(), type, 0, arguments); }
    Deferred &declareExchange(ExchangeType type = fanout, int flags = 0) { return _implementation->declareExchange(std::string_view(), type, flags, Table()); }

    /**
     *  Remove an exchange
     *
     *  The following flags can be used for the exchange:
     *
     *      -   ifunused    only delete if no queues are connected

     *  @param  name        name of the exchange to remove
     *  @param  flags       optional flags
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &removeExchange(const std::string_view &name, int flags = 0) { return _implementation->removeExchange(name, flags); }

    /**
     *  Bind two exchanges to each other
     *
     *  @param  source      the source exchange
     *  @param  target      the target exchange
     *  @param  routingkey  the routing key
     *  @param  arguments   additional bind arguments
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &bindExchange(const std::string_view &source, const std::string_view &target, const std::string_view &routingkey, const Table &arguments) { return _implementation->bindExchange(source, target, routingkey, arguments); }
    Deferred &bindExchange(const std::string_view &source, const std::string_view &target, const std::string_view &routingkey) { return _implementation->bindExchange(source, target, routingkey, Table()); }

    /**
     *  Unbind two exchanges from one another
     *
     *  @param  target      the target exchange
     *  @param  source      the source exchange
     *  @param  routingkey  the routing key
     *  @param  arguments   additional unbind arguments
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &unbindExchange(const std::string_view &target, const std::string_view &source, const std::string_view &routingkey, const Table &arguments) { return _implementation->unbindExchange(target, source, routingkey, arguments); }
    Deferred &unbindExchange(const std::string_view &target, const std::string_view &source, const std::string_view &routingkey) { return _implementation->unbindExchange(target, source, routingkey, Table()); }

    /**
     *  Declare a queue
     *
     *  If you do not supply a name, a name will be assigned by the server.
     *
     *  The flags can be a combination of the following values:
     *
     *      -   durable     queue survives a broker restart
     *      -   autodelete  queue is automatically removed when all connected consumers are gone
     *      -   passive     only check if the queue exist
     *      -   exclusive   the queue only exists for this connection, and is automatically removed when connection is gone
     *
     *  @param  name        name of the queue
     *  @param  flags       combination of flags
     *  @param  arguments   optional arguments
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     *
     *  The onSuccess() callback that you can install should have the following signature:
     *
     *      void myCallback(const std::string &name, uint32_t messageCount, uint32_t consumerCount);
     *
     *  For example: channel.declareQueue("myqueue").onSuccess([](const std::string &name, uint32_t messageCount, uint32_t consumerCount) {
     *
     *      std::cout << "Queue '" << name << "' has been declared with " << messageCount << " messages and " << consumerCount << " consumers" << std::endl;
     *
     *  });
     */
    DeferredQueue &declareQueue(const std::string_view &name, int flags, const Table &arguments) { return _implementation->declareQueue(name, flags, arguments); }
    DeferredQueue &declareQueue(const std::string_view &name, const Table &arguments) { return _implementation->declareQueue(name, 0, arguments); }
    DeferredQueue &declareQueue(const std::string_view &name, int flags = 0) { return _implementation->declareQueue(name, flags, Table()); }
    DeferredQueue &declareQueue(int flags, const Table &arguments) { return _implementation->declareQueue(std::string_view(), flags, arguments); }
    DeferredQueue &declareQueue(const Table &arguments) { return _implementation->declareQueue(std::string_view(), 0, arguments); }
    DeferredQueue &declareQueue(int flags = 0) { return _implementation->declareQueue(std::string_view(), flags, Table()); }

    /**
     *  Bind a queue to an exchange
     *
     *  @param  exchange    the source exchange
     *  @param  queue       the target queue
     *  @param  routingkey  the routing key
     *  @param  arguments   additional bind arguments
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey, const Table &arguments) { return _implementation->bindQueue(exchange, queue, routingkey, arguments); }
    Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey) { return _implementation->bindQueue(exchange, queue, routingkey, Table()); }

    /**
     *  Unbind a queue from an exchange
     *  @param  exchange    the source exchange
     *  @param  queue       the target queue
     *  @param  routingkey  the routing key
     *  @param  arguments   additional bind arguments
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &unbindQueue(const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey, const Table &arguments) {  return _implementation->unbindQueue(exchange, queue, routingkey, arguments); }
    Deferred &unbindQueue(const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey) { return _implementation->unbindQueue(exchange, queue, routingkey, Table()); }

    /**
     *  Purge a queue
     *
     *  @param  name        name of the queue
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     *
     *  The onSuccess() callback that you can install should have the following signature:
     *
     *      void myCallback(uint32_t messageCount);
     *
     *  For example: channel.purgeQueue("myqueue").onSuccess([](uint32_t messageCount) {
     *
     *      std::cout << "Queue purged, all " << messageCount << " messages removed" << std::endl;
     *
     *  });
     */
    DeferredDelete &purgeQueue(const std::string_view &name){ return _implementation->purgeQueue(name); }

    /**
     *  Remove a queue
     *
     *  The following flags can be used for the exchange:
     *
     *      -   ifunused    only delete if no consumers are connected
     *      -   ifempty     only delete if the queue is empty
     *
     *  @param  name        name of the queue to remove
     *  @param  flags       optional flags
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     *
     *  The onSuccess() callback that you can install should have the following signature:
     *
     *      void myCallback(uint32_t messageCount);
     *
     *  For example: channel.removeQueue("myqueue").onSuccess([](uint32_t messageCount) {
     *
     *      std::cout << "Queue deleted, along with " << messageCount << " messages" << std::endl;
     *
     *  });
     */
    DeferredDelete &removeQueue(const std::string_view &name, int flags = 0) { return _implementation->removeQueue(name, flags); }

    /**
     *  Publish a message to an exchange
     * 
     *  You have to supply the name of an exchange and a routing key. RabbitMQ will then try
     *  to send the message to one or more queues. With the optional flags parameter you can
     *  specify what should happen if the message could not be routed to a queue. By default,
     *  unroutable message are silently discarded.
     * 
     *  If you set the 'mandatory' and/or 'immediate' flag, messages that could not be handled 
     *  are returned to the application. Make sure that you have called the recall()-method and
     *  have set up all appropriate handlers to process these returned messages before you start
     *  publishing.
     * 
     *  The following flags can be supplied:
     * 
     *      -   mandatory   If set, server returns messages that are not sent to a queue
     *      -   immediate   If set, server returns messages that can not immediately be forwarded to a consumer. 
     * 
     *  @param  exchange    the exchange to publish to
     *  @param  routingkey  the routing key
     *  @param  envelope    the full envelope to send
     *  @param  message     the message to send
     *  @param  size        size of the message
     *  @param  flags       optional flags
     */
    bool publish(const std::string_view &exchange, const std::string_view &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); }
    bool publish(const std::string_view &exchange, const std::string_view &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
    bool publish(const std::string_view &exchange, const std::string_view &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); }
    bool publish(const std::string_view &exchange, const std::string_view &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }

    /**
     *  Set the Quality of Service (QOS) for this channel
     *
     *  When you consume messages, every single message needs to be ack'ed to inform
     *  the RabbitMQ server that is has been received. The Qos setting specifies the
     *  number of unacked messages that may exist in the client application. The server
     *  stops delivering more messages if the number of unack'ed messages has reached
     *  the prefetchCount
     *
     *  @param  prefetchCount       maximum number of messages to prefetch
     *  @param  global              share counter between all consumers on the same channel
     *  @return bool                whether the Qos frame is sent.
     */
    Deferred &setQos(uint16_t prefetchCount, bool global = false)
    {
        return _implementation->setQos(prefetchCount, global);
    }

    /**
     *  Tell the RabbitMQ server that we're ready to consume messages
     *
     *  After this method is called, RabbitMQ starts delivering messages to the client
     *  application. The consume tag is a string identifier that you can use to identify
     *  the consumer if you later want to stop it with with a channel::cancel() call. 
     *  If you do not specify a consumer tag, the server will assign one for you.
     *
     *  The following flags are supported:
     *
     *      -   nolocal             if set, messages published on this channel are not also consumed
     *      -   noack               if set, consumed messages do not have to be acked, this happens automatically
     *      -   exclusive           request exclusive access, only this consumer can access the queue
     *
     *  @param  queue               the queue from which you want to consume
     *  @param  tag                 a consumer tag that will be associated with this consume operation
     *  @param  flags               additional flags
     *  @param  arguments           additional arguments
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     *
     *  The onSuccess() callback that you can install should have the following signature:
     *
     *      void myCallback(const std::string_view& tag);
     *
     *  For example: channel.consume("myqueue").onSuccess([](const std::string_view& tag) {
     *
     *      std::cout << "Started consuming under tag " << tag << std::endl;
     *
     *  });
     */
    DeferredConsumer &consume(const std::string_view &queue, const std::string_view &tag, int flags, const Table &arguments) { return _implementation->consume(queue, tag, flags, arguments); }
    DeferredConsumer &consume(const std::string_view &queue, const std::string_view &tag, int flags = 0) { return _implementation->consume(queue, tag, flags, Table()); }
    DeferredConsumer &consume(const std::string_view &queue, const std::string_view &tag, const Table &arguments) { return _implementation->consume(queue, tag, 0, arguments); }
    DeferredConsumer &consume(const std::string_view &queue, int flags, const Table &arguments) { return _implementation->consume(queue, std::string_view(), flags, arguments); }
    DeferredConsumer &consume(const std::string_view &queue, int flags = 0) { return _implementation->consume(queue, std::string_view(), flags, Table()); }
    DeferredConsumer &consume(const std::string_view &queue, const Table &arguments) { return _implementation->consume(queue, std::string_view(), 0, arguments); }

    /**
     *  Tell the messages that you are ready to recall/take back messages that messages thar are unroutable.
     * 
     *  When you use the publish() method in combination with the 'immediate' or 'mandatory' flag, rabbitmq
     *  sends back unroutable messages. With this recall() method you can install a sort of pseudo-consumer
     *  that defines how such returned-messages are processed.
     * 
     *  Watch out: when you call this method more than once, you always get access to the same object. You
     *  can thus not install multiple callbacks for the same event.
     */
    DeferredRecall &recall() { return _implementation->recall(); }

    /**
     *  Cancel a running consume call
     *
     *  If you want to stop a running consumer, you can use this method with the consumer tag
     *
     *  @param  tag                 the consumer tag
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     *
     *  The onSuccess() callback that you can install should have the following signature:
     *
     *      void myCallback(const std::string& tag);
     *
     *  For example: channel.cancel("myqueue").onSuccess([](const std::string& tag) {
     *
     *      std::cout << "Stopped consuming under tag " << tag << std::endl;
     *
     *  });
     */
    DeferredCancel &cancel(const std::string_view &tag) { return _implementation->cancel(tag); }

    /**
     *  Retrieve a single message from RabbitMQ
     * 
     *  When you call this method, you can get one single message from the queue (or none
     *  at all if the queue is empty). The deferred object that is returned, should be used
     *  to install a onEmpty() and onSuccess() callback function that will be called
     *  when the message is consumed and/or when the message could not be consumed.
     * 
     *  The following flags are supported:
     * 
     *      -   noack               if set, consumed messages do not have to be acked, this happens automatically
     * 
     *  @param  queue               name of the queue to consume from
     *  @param  flags               optional flags
     * 
     *  The object returns a deferred handler. Callbacks can be installed 
     *  using onSuccess(), onEmpty(), onError() and onFinalize() methods.
     * 
     *  The onSuccess() callback has the following signature:
     * 
     *      void myCallback(const Message &message, uint64_t deliveryTag, bool redelivered);
     * 
     *  For example: channel.get("myqueue").onSuccess([](const Message &message, uint64_t deliveryTag, bool redelivered) {
     * 
     *      std::cout << "Message fetched" << std::endl;
     * 
     *  }).onEmpty([]() {
     * 
     *      std::cout << "Queue is empty" << std::endl;
     * 
     *  });
     */
    DeferredGet &get(const std::string_view &queue, int flags = 0) { return _implementation->get(queue, flags); }

    /**
     *  Acknoldge a received message
     *
     *  When a message is received in the DeferredConsumer::onReceived() method,
     *  you must acknowledge it so that RabbitMQ removes it from the queue (unless
     *  you are consuming with the noack option). This method can be used for
     *  this acknowledging.
     *
     *  The following flags are supported:
     *
     *      -   multiple            acknowledge multiple messages: all un-acked messages that were earlier delivered are acknowledged too
     *
     *  @param  deliveryTag         the unique delivery tag of the message
     *  @param  flags               optional flags
     *  @return bool
     */
    bool ack(uint64_t deliveryTag, int flags=0) { return _implementation->ack(deliveryTag, flags); }

    /**
     *  Reject or nack a message
     *
     *  When a message was received in the DeferredConsumer::onReceived() method,
     *  and you don't want to acknowledge it, you can also choose to reject it by
     *  calling this reject method.
     *
     *  The following flags are supported:
     *
     *      -   multiple            reject multiple messages: all un-acked messages that were earlier delivered are unacked too
     *      -   requeue             if set, the message is put back in the queue, otherwise it is dead-lettered/removed
     *
     *  @param  deliveryTag         the unique delivery tag of the message
     *  @param  flags               optional flags
     *  @return bool
     */
    bool reject(uint64_t deliveryTag, int flags=0) { return _implementation->reject(deliveryTag, flags); }

    /**
     *  Recover all messages that were not yet acked
     *
     *  This method asks the server to redeliver all unacknowledged messages on a specified
     *  channel. Zero or more messages may be redelivered.
     *
     *  The following flags are supported:
     *
     *      -   requeue             if set, the server will requeue the messages, so the could also end up with at different consumer
     *
     *  @param  flags
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &recover(int flags = 0) { return _implementation->recover(flags); }

    /**
     *  Close the current channel
     *
     *  This function returns a deferred handler. Callbacks can be installed
     *  using onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &close() { return _implementation->close(); }

    /**
     *  Get the channel we're working on
     *  @return uint16_t
     */
    uint16_t id() const
    {
        return _implementation->id();
    }

    /**
     *  Some internal classes may touch our implementation
     */
    friend class Tagger;
};

/**
 *  end namespace
 */
}