1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
/**
* DeferredPublish.h
*
* Deferred callback for RabbitMQ-specific publisher confirms mechanism per-message.
*
* @author Michael van der Werve <michael.vanderwerve@mailerq.com>
* @copyright 2020 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Set up namespace
*/
namespace AMQP {
/**
* We extend from the default deferred and add extra functionality
*/
class DeferredPublish : public Deferred
{
private:
/**
* Callback to execute when server confirms that message is processed
* @var AckCallback
*/
PublishAckCallback _ackCallback;
/**
* Callback to execute when server sends negative acknowledgement
* @var NackCallback
*/
PublishNackCallback _nackCallback;
/**
* Callback to execute when message is lost (nack / error)
* @var LostCallback
*/
PublishLostCallback _lostCallback;
/**
* Report an ack, calls the callback.
*/
void reportAck()
{
// check if the callback is set
if (_ackCallback) _ackCallback();
}
/**
* Report an nack, calls the callback if set.
*/
void reportNack()
{
// check if the callback is set
if (_nackCallback) _nackCallback();
// message is 'lost'
if (_lostCallback) _lostCallback();
}
/**
* Indicate failure
* @param error Description of the error that occured
*/
void reportError(const char *error)
{
// from this moment on the object should be listed as failed
_failed = true;
// message is lost
if (_lostCallback) _lostCallback();
// execute callbacks if registered
if (_errorCallback) _errorCallback(error);
}
/**
* The wrapped confirmed channel implementation may call our
* private members and construct us
*/
template <class T>
friend class Reliable;
public:
/**
* Protected constructor that can only be called
* from within the channel implementation
*
* Note: this constructor _should_ be protected, but because make_shared
* will then not work, we have decided to make it public after all,
* because the work-around would result in not-so-easy-to-read code.
*
* @param boolean are we already failed?
*/
DeferredPublish(bool failed = false) : Deferred(failed) {}
public:
/**
* Callback that is called when the broker confirmed message publication
* @param callback the callback to execute
*/
inline DeferredPublish &onAck(const PublishAckCallback& callback) { return onAck(PublishAckCallback(callback)); }
DeferredPublish &onAck(PublishAckCallback&& callback)
{
// store callback
_ackCallback = std::move(callback);
// allow chaining
return *this;
}
/**
* Callback that is called when the broker denied message publication
* @param callback the callback to execute
*/
inline DeferredPublish &onNack(const PublishNackCallback& callback) { return onNack(PublishNackCallback(callback)); }
DeferredPublish &onNack(PublishNackCallback&& callback)
{
// store callback
_nackCallback = std::move(callback);
// allow chaining
return *this;
}
/**
* Callback that is called when a message is lost, either through RabbitMQ
* rejecting it or because of a channel error
* @param callback the callback to execute
*/
inline DeferredPublish &onLost(const PublishLostCallback& callback) { return onLost(PublishLostCallback(callback)); }
DeferredPublish &onLost(PublishLostCallback&& callback)
{
// store callback
_lostCallback = std::move(callback);
// allow chaining
return *this;
}
};
/**
* End namespace
*/
}
|