1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
/**
* Throttle.h
*
* A channel wrapper that publishes more messages as soon as there is more capacity.
*
* @author Michael van der Werve <michael.vanderwerve@mailerq.com>
* @copyright 2020 Copernica BV
*/
/**
* Header guard
*/
#pragma once
/**
* Includes
*/
#include <cstdint>
#include <set>
#include <queue>
#include "copiedbuffer.h"
#include "channelimpl.h"
#include "tagger.h"
/**
* Begin of namespaces
*/
namespace AMQP {
/**
* Forward declarations
*/
class Channel;
/**
* Class definition
*/
class Throttle : public Tagger
{
protected:
/**
* Last sent ID
* @var uint64_t
*/
uint64_t _last = 0;
/**
* Throttle
* @var size_t
*/
size_t _throttle;
/**
* Messages that should still be sent out.
* @var queue
*/
std::queue<std::pair<uint64_t, CopiedBuffer>> _queue;
/**
* Set of open deliverytags. We want a normal set (not unordered_set) because
* removal will be cheaper for whole ranges.
* @var size_t
*/
std::set<size_t> _open;
protected:
/**
* Send method for a frame
* @param id
* @param frame
*/
virtual bool send(uint64_t id, const Frame &frame) override;
/**
* Method that is called to report an error
* @param message
*/
virtual void reportError(const char *message) override;
/**
* Method that is called to report an ack/nack
* @param deliveryTag
* @param multiple
*/
virtual void onAck(uint64_t deliveryTag, bool multiple) override;
virtual void onNack(uint64_t deliveryTag, bool multiple) override;
public:
/**
* Constructor. Warning: this takes control of the channel, there should be no extra
* handlers set on the channel (onError) and no further publishes should be done on the
* raw channel either. Doing this will cause the throttle to work incorrectly, as the
* counters are not properly updated.
* @param channel
* @param throttle
*/
Throttle(Channel &channel, size_t throttle);
/**
* Deleted copy constructor, deleted move constructor
* @param other
*/
Throttle(const Throttle &other) = delete;
Throttle(Throttle &&other) = delete;
/**
* Deleted copy assignment, deleted move assignment
* @param other
*/
Throttle &operator=(const Throttle &other) = delete;
Throttle &operator=(Throttle &&other) = delete;
/**
* Virtual destructor
*/
virtual ~Throttle() = default;
/**
* Method to check how many messages are still unacked.
* @return size_t
*/
virtual size_t unacknowledged() const override { return _open.size() + (_current - _last - 1); }
/**
* Get the throttle
* @return size_t
*/
size_t throttle() const { return _throttle; }
/**
* Set a new throttle. Note that this will only gradually take effect when set down, and
* the update is picked up on the next acknowledgement.
* @param size_t
*/
void throttle(size_t throttle) { _throttle = throttle; }
/**
* Flush the throttle. This flushes it _without_ taking the throttle into account, e.g. the messages
* are sent in a burst over the channel.
* @param max optional maximum, 0 is flush all
*/
size_t flush(size_t max = 0);
};
/**
* End of namespaces
*/
}
|