File: reliable.h

package info (click to toggle)
amqp-cpp 4.3.27-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,384 kB
  • sloc: cpp: 10,021; ansic: 191; makefile: 95
file content (287 lines) | stat: -rw-r--r-- 9,569 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
/**
 *  Reliable.h
 *  
 *  A channel wrapper based on AMQP::Tagger that allows message callbacks to be installed
 *  on the publish-confirms, to be called when they a confirmation is received from RabbitMQ.
 * 
 *  You can also change the base class and use Reliable<Throttle> if you not only
 *  want to be notified about the publish-confirms, but want to use it for automatic
 *  throttling at the same time.
 *  
 *  @author Michael van der Werve <michael.vanderwerve@mailerq.com>
 *  @copyright 2020 - 2023 Copernica BV
 */

/**
 *  Header guard
 */
#pragma once

/**
 *  Includes
 */
#include "deferredpublish.h"
#include "tagger.h"
#include <memory>

/**
 *  Begin of namespaces
 */
namespace AMQP { 

/**
 *  Class definition
 */
template <typename BASE=Tagger>
class Reliable : public BASE
{
private:
    // make sure it is a proper channel
    static_assert(std::is_base_of<Tagger, BASE>::value, "base should be derived from a confirmed channel.");

    /**
     *  Set of open deliverytags. We want a normal set (not unordered_set) because
     *  removal will be cheaper for whole ranges.
     *  @var size_t
     */
    std::map<size_t, std::shared_ptr<DeferredPublish>> _handlers;

    /**
     *  Called when the deliverytag(s) are acked
     *  @param  deliveryTag
     *  @param  multiple
     */
    void onAck(uint64_t deliveryTag, bool multiple) override
    {
        // monitor the object, watching for destruction since these ack/nack handlers
        // could destruct the object
        Monitor monitor(this);

        // single element is simple
        if (!multiple)
        {
            // find the element
            auto iter = _handlers.find(deliveryTag);

            // we did not find it (this should not be possible, unless somebody explicitly called)
            // the base-class publish methods for some reason.
            if (iter == _handlers.end()) return BASE::onAck(deliveryTag, multiple);

            // get the handler (we store it first so that we can remove it)
            auto handler = iter->second;

            // erase it from the map (we remove it before the call, because the callback might update
            // the _handlers and invalidate the iterator)
            _handlers.erase(iter);

            // call the ack handler
            handler->reportAck();

        }

        // do multiple at once
        else
        {
            // keep looping for as long as the object is in a valid state
            while (monitor && !_handlers.empty())
            {
                // get the first handler
                auto iter = _handlers.begin();
                
                // make sure this is the right deliverytag, if we've passed it we leap out
                if (iter->first > deliveryTag) break;

                // get the handler
                auto handler = iter->second;
                
                // remove it from the map (before we make a call to userspace, so that user space
                // can add even more handlers, without invalidating iterators)
                _handlers.erase(iter);

                // call the ack handler
                handler->reportAck();
            }
        }

        // make sure the object is still valid
        if (!monitor) return;

        // call base handler as well
        BASE::onAck(deliveryTag, multiple);
    }

    /**
     *  Called when the deliverytag(s) are nacked
     *  @param  deliveryTag
     *  @param  multiple
     */
    void onNack(uint64_t deliveryTag, bool multiple) override
    {
        // monitor the object, watching for destruction since these ack/nack handlers
        // could destruct the object
        Monitor monitor(this);

        // single element is simple
        if (!multiple)
        {
            // find the element
            auto iter = _handlers.find(deliveryTag);

            // we did not find it (this should not be possible, unless somebody explicitly called)
            // the base-class publish methods for some reason.
            if (iter == _handlers.end()) return BASE::onNack(deliveryTag, multiple);

            // get the handler (we store it first so that we can remove it)
            auto handler = iter->second;

            // erase it from the map (we remove it before the call, because the callback might update
            // the _handlers and invalidate the iterator)
            _handlers.erase(iter);

            // call the ack handler
            handler->reportNack();
        }

        // nack multiple elements
        else
        {
            // keep looping for as long as the object is in a valid state
            while (monitor && !_handlers.empty())
            {
                // get the first handler
                auto iter = _handlers.begin();
                
                // make sure this is the right deliverytag, if we've passed it we leap out
                if (iter->first > deliveryTag) break;

                // get the handler
                auto handler = iter->second;
                
                // remove it from the map (before we make a call to userspace, so that user space
                // can add even more handlers, without invalidating iterators)
                _handlers.erase(iter);

                // call the ack handler
                handler->reportNack();
            }
        }

        // if the object is no longer valid, return
        if (!monitor) return;

        // call the base handler
        BASE::onNack(deliveryTag, multiple);
    }

    /**
     *  Method that is called to report an error
     *  @param  message
     */
    void reportError(const char *message) override
    {
        // monitor the object, watching for destruction since these ack/nack handlers
        // could destruct the object
        Monitor monitor(this);

        // move the handlers out
        auto handlers = std::move(_handlers);

        // iterate over all the messages
        // call the handlers
        for (const auto &iter : handlers)
        {
            // call the handler
            iter.second->reportError(message);

            // if we were destructed in the meantime, we leap out
            if (!monitor) return;
        }

        // if the monitor is no longer valid, leap out
        if (!monitor) return;

        // call the base handler
        BASE::reportError(message);
    }

public:
    /**
     *  Constructor
     *  @param  channel
     *  @param  throttle
     */
    template <typename ...Args>
    Reliable(Args &&...args) : BASE(std::forward<Args>(args)...) {}

    /**
     *  Deleted copy constructor, deleted move constructor
     *  @param other
     */
    Reliable(const Reliable &other) = delete;
    Reliable(Reliable &&other) = delete;

    /**
     *  Deleted copy assignment, deleted move assignment
     *  @param  other
     */
    Reliable &operator=(const Reliable &other) = delete;
    Reliable &operator=(Reliable &&other) = delete;

    /**
     *  Virtual destructor
     */
    virtual ~Reliable() = default;

    /**
     *  Method to check how many messages are still unacked.
     *  @return size_t
     */
    virtual size_t unacknowledged() const override { return _handlers.size(); }

    /**
     *  Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. 
     *  Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
     * 
     *  @param  exchange    the exchange to publish to
     *  @param  routingkey  the routing key
     *  @param  envelope    the full envelope to send
     *  @param  message     the message to send
     *  @param  size        size of the message
     *  @param  flags       optional flags
     *  @return bool
     */
    DeferredPublish &publish(const std::string_view &exchange, const std::string_view &routingKey, const std::string_view &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
    DeferredPublish &publish(const std::string_view &exchange, const std::string_view &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); }
    DeferredPublish &publish(const std::string_view &exchange, const std::string_view &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }

    /**
     *  Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. 
     *  Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
     * 
     *  @param  exchange    the exchange to publish to
     *  @param  routingkey  the routing key
     *  @param  envelope    the full envelope to send
     *  @param  message     the message to send
     *  @param  size        size of the message
     *  @param  flags       optional flags
     */
    DeferredPublish &publish(const std::string_view &exchange, const std::string_view &routingKey, const Envelope &envelope, int flags = 0)
    {
        // publish the entire thing, and remember if it failed at any point
        uint64_t tag = BASE::publish(exchange, routingKey, envelope, flags);
        
        // create the publish deferred object, if we got no tag we failed
        auto handler = std::make_shared<DeferredPublish>(tag == 0);

        // add it to the open handlers
        _handlers[tag] = handler;

        // return the dereferenced handler 
        return *handler;
    }
};

/**
 *  End of namespaces
 */
}