File: deferredconsumer.h

package info (click to toggle)
amqp-cpp 4.3.27-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,384 kB
  • sloc: cpp: 10,021; ansic: 191; makefile: 95
file content (323 lines) | stat: -rw-r--r-- 10,123 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
/**
 *  DeferredConsumer.h
 *
 *  Deferred callback for consumers
 *
 *  @copyright 2014 - 2022 Copernica BV
 */

/**
 *  Include guard
 */
#pragma once

/**
 *  Dependencies
 */
#include "deferredextreceiver.h"

/**
 *  Set up namespace
 */
namespace AMQP {
    
/**
 *  Forward declararions
 */
class BasicDeliverFrame;

/**
 *  We extend from the default deferred and add extra functionality
 */
class DeferredConsumer : public DeferredExtReceiver, public std::enable_shared_from_this<DeferredConsumer>
{
private:
    /**
     *  Callback to execute when consumption has started
     *  @var    ConsumeCallback
     */
    ConsumeCallback _consumeCallback;

    /**
     *  Callback to excute when the server has cancelled the consumer
     *  @var    CancelCallback
     */
    CancelCallback _cancelCallback;

    /**
     *  Process a delivery frame
     *
     *  @param  frame   The frame to process
     */
    void process(BasicDeliverFrame &frame);

    /**
     *  Report success for frames that report start consumer operations
     *  @param  name            Consumer tag that is started
     *  @return Deferred
     */
    virtual const std::shared_ptr<Deferred> &reportSuccess(const std::string &name) override;

    /**
     *  Report that the server has cancelled this consumer
     *  @param  namae           The consumer tag
     */
    void reportCancelled(const std::string &name)
    {
        // report
        if (_cancelCallback) _cancelCallback(name);
    }

    /**
     *  Get reference to self to prevent that object falls out of scope
     *  @return std::shared_ptr
     */
    virtual std::shared_ptr<DeferredReceiver> lock() override { return shared_from_this(); }

    /**
     *  The channel implementation may call our
     *  private members and construct us
     */
    friend class ChannelImpl;
    friend class ConsumedMessage;
    friend class BasicDeliverFrame;

public:
    /**
     *  Constructor that should only be called from within the channel implementation
     *
     *  Note: this constructor _should_ be protected, but because make_shared
     *  will then not work, we have decided to make it public after all,
     *  because the work-around would result in not-so-easy-to-read code.
     *
     *  @param  channel     the channel implementation
     *  @param  failed      are we already failed?
     */
    DeferredConsumer(ChannelImpl *channel, bool failed = false) :
        DeferredExtReceiver(failed, channel) {}

public:
    /**
     *  Register a callback function that gets called when the consumer is
     *  started. In the callback you will for receive the consumer-tag
     *  that you need to later stop the consumer
     *  @param  callback
     */
    inline DeferredConsumer &onSuccess(const ConsumeCallback& callback) { return onSuccess(ConsumeCallback(callback)); }
    DeferredConsumer &onSuccess(ConsumeCallback&& callback)
    {
        // store the callback
        _consumeCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register the function that is called when the consumer starts.
     *  It is recommended to use the onSuccess() method mentioned above
     *  since that will also pass the consumer-tag as parameter.
     *  @param  callback
     */
    inline DeferredConsumer &onSuccess(const SuccessCallback& callback) { return onSuccess(SuccessCallback(callback)); }
    DeferredConsumer &onSuccess(SuccessCallback&& callback)
    {
        // call base
        Deferred::onSuccess(std::move(std::move(callback)));

        // allow chaining
        return *this;
    }

    /**
     *  Register a function to be called when a full message is received
     *  @param  callback    the callback to execute
     */
    inline DeferredConsumer &onReceived(const MessageCallback& callback) { return onReceived(MessageCallback(callback)); }
    DeferredConsumer &onReceived(MessageCallback&& callback)
    {
        // store callback
        _messageCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Alias for onReceived() (see above)
     *  @param  callback    the callback to execute
     */
    inline DeferredConsumer &onMessage(const MessageCallback& callback) { return onMessage(MessageCallback(callback)); }
    DeferredConsumer &onMessage(MessageCallback&& callback)
    {
        // store callback
        _messageCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  RabbitMQ sends a message in multiple frames to its consumers.
     *  The AMQP-CPP library collects these frames and merges them into a 
     *  single AMQP::Message object that is passed to the callback that
     *  you can set with the onReceived() or onMessage() methods (see above).
     * 
     *  However, you can also write your own algorithm to merge the frames.
     *  In that case you can install callbacks to handle the frames. Every
     *  message is sent in a number of frames:
     * 
     *      - a begin frame that marks the start of the message
     *      - an optional header if the message was sent with an envelope
     *      - zero or more data frames (usually 1, but more for large messages)
     *      - an end frame to mark the end of the message.
     *  
     *  To install handlers for these frames, you can use the onBegin(), 
     *  onHeaders(), onData() and onComplete() methods.
     * 
     *  If you just rely on the onReceived() or onMessage() callbacks, you
     *  do not need any of the methods below this line.
     */

    /**
     *  Register the function that is called when the start frame of a new 
     *  consumed message is received
     *
     *  @param  callback    The callback to invoke
     *  @return Same object for chaining
     */
    inline DeferredConsumer &onBegin(const StartCallback& callback) { return onBegin(StartCallback(callback)); }
    DeferredConsumer &onBegin(StartCallback&& callback)
    {
        // store callback
        _startCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register the function that is called when the start frame of a new 
     *  consumed message is received
     *
     *  @param  callback    The callback to invoke
     *  @return Same object for chaining
     */
    inline DeferredConsumer &onStart(const StartCallback& callback) { return onStart(StartCallback(callback)); }
    DeferredConsumer &onStart(StartCallback&& callback)
    {
        // store callback
        _startCallback = std::move(callback);

        // allow chaining
        return *this;
    }
    
    /**
     *  Register a function that is called when the message size is known
     * 
     *  @param  callback    The callback to invoke for message headers
     *  @return Same object for chaining
     */
    inline DeferredConsumer &onSize(const SizeCallback& callback) { return onSize(SizeCallback(callback)); }
    DeferredConsumer &onSize(SizeCallback&& callback)
    {
        // store callback
        _sizeCallback = std::move(callback);
        
        // allow chaining
        return *this;
    }

    /**
     *  Register the function that is called when message headers come in
     *
     *  @param  callback    The callback to invoke for message headers
     *  @return Same object for chaining
     */
    inline DeferredConsumer &onHeaders(const HeaderCallback& callback) { return onHeaders(HeaderCallback(callback)); }
    DeferredConsumer &onHeaders(HeaderCallback&& callback)
    {
        // store callback
        _headerCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register the function to be called when a chunk of data comes in
     *
     *  Note that this function may be called zero, one or multiple times
     *  for each incoming message depending on the size of the message data.
     *
     *  If you install this callback you very likely also want to install
     *  the onComplete callback so you know when the last data part was
     *  received.
     *
     *  @param  callback    The callback to invoke for chunks of message data
     *  @return Same object for chaining
     */
    inline DeferredConsumer &onData(const DataCallback& callback) { return onData(DataCallback(callback)); }
    DeferredConsumer &onData(DataCallback&& callback)
    {
        // store callback
        _dataCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register a funtion to be called when a message was completely received
     *
     *  @param  callback    The callback to invoke
     *  @return Same object for chaining
     */
    inline DeferredConsumer &onComplete(const DeliveredCallback& callback) { return onComplete(DeliveredCallback(callback)); }
    DeferredConsumer &onComplete(DeliveredCallback&& callback)
    {
        // store callback
        _deliveredCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register a funtion to be called when a message was completely received
     *
     *  @param  callback    The callback to invoke
     *  @return Same object for chaining
     */
    inline DeferredConsumer &onDelivered(const DeliveredCallback& callback) { return onDelivered(DeliveredCallback(callback)); }
    DeferredConsumer &onDelivered(DeliveredCallback&& callback)
    {
        // store callback
        _deliveredCallback = std::move(callback);

        // allow chaining
        return *this;
    }
    
    /**
     *  Register a funtion to be called when the server cancelled the consumer
     *
     *  @param  callback    The callback to invoke
     *  @return Same object for chaining
     */
    inline DeferredConsumer &onCancelled(const CancelCallback& callback) { return onCancelled(CancelCallback(callback)); }
    DeferredConsumer &onCancelled(CancelCallback&& callback)
    {
        // store callback
        _cancelCallback = std::move(callback);

        // allow chaining
        return *this;
    }
};

/**
 *  End namespace
 */
}