1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
/**
* Tagger.h
*
* Base class that enables publisher confirms and keeps track of the sent
* messages. You can wrap this class around a AMQP::Channel object and use
* this object for publishing instead. This is a base class that you cannot
* use directly. You should instead use:
*
* - Throttle: to throttle traffic to prevent flooding RabbitMQ
* - Reliable<Tagger>: to be notified about publish-confirms via callbacks
* - Reliable<Throttle>: to have throttle + notifications via callbacks
*
* @author Michael van der Werve <michael.vanderwerve@mailerq.com>
* @copyright 2020 - 2023 Copernica BV
*/
/**
* Header guard
*/
#pragma once
/**
* Includes
*/
#include "deferredpublish.h"
#include <memory>
/**
* Begin of namespaces
*/
namespace AMQP {
/**
* Class definition
*/
class Tagger : public Watchable
{
protected:
/**
* The implementation for the channel
* @var std::shared_ptr<ChannelImpl>
*/
std::shared_ptr<ChannelImpl> _implementation;
/**
* Current id, always starts at 1.
* @var uint64_t
*/
uint64_t _current = 1;
/**
* Deferred to set up on the close
* @var std::shared_ptr<Deferred>
*/
std::shared_ptr<Deferred> _close;
/**
* Callback to call when an error occurred
* @var ErrorCallback
*/
ErrorCallback _errorCallback;
protected:
/**
* Send method for a frame
* @param id
* @param frame
*/
virtual bool send(uint64_t id, const Frame &frame);
/**
* Method that is called to report an error.
* @param message
*/
virtual void reportError(const char *message);
/**
* Method that gets called on ack/nack. If these methods are overridden, make sure
* to also call the base class methods.
* @param deliveryTag
* @param multiple
*/
virtual void onAck(uint64_t deliveryTag, bool multiple);
virtual void onNack(uint64_t deliveryTag, bool multiple);
public:
/**
* Constructor
* @param channel
*/
Tagger(AMQP::Channel &channel);
/**
* Deleted copy constructor, deleted move constructor
* @param other
*/
Tagger(const Tagger &other) = delete;
Tagger(Tagger &&other) = delete;
/**
* Deleted copy assignment, deleted move assignment
* @param other
*/
Tagger &operator=(const Tagger &other) = delete;
Tagger &operator=(Tagger &&other) = delete;
/**
* Virtual destructor
*/
virtual ~Tagger();
/**
* Method to check how many messages are still unacked.
* @return size_t
*/
virtual size_t unacknowledged() const { return 0; }
/**
* Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags.
* Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
*
* @param exchange the exchange to publish to
* @param routingkey the routing key
* @param envelope the full envelope to send
* @param message the message to send
* @param size size of the message
* @param flags optional flags
* @return uint64_t
*/
uint64_t publish(const std::string_view &exchange, const std::string_view &routingKey, const Envelope &envelope, int flags = 0);
uint64_t publish(const std::string_view &exchange, const std::string_view &routingKey, const std::string_view &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
uint64_t publish(const std::string_view &exchange, const std::string_view &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); }
uint64_t publish(const std::string_view &exchange, const std::string_view &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
/**
* Close underlying channel
* @return Deferred&
*/
Deferred &close();
/**
* Install an error callback
* @param callback
*/
inline void onError(const ErrorCallback& callback) { return onError(ErrorCallback(callback)); }
void onError(ErrorCallback&& callback);
};
/**
* End of namespaces
*/
}
|