File: deferredget.h

package info (click to toggle)
amqp-cpp 4.3.27-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,384 kB
  • sloc: cpp: 10,021; ansic: 191; makefile: 95
file content (304 lines) | stat: -rw-r--r-- 9,445 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
/**
 *  DeferredGet.h
 *
 *  @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
 *  @copyright 2014 - 2018 Copernica BV
 */

/**
 *  Include guard
 */
#pragma once

/**
 *  Dependencies
 */
#include "deferredextreceiver.h"

/**
 *  Set up namespace
 */
namespace AMQP {

/**
 *  Class definition
 *
 *  This class implements the 'shared_from_this' functionality, because
 *  it grabs a self-pointer when the callback is running, otherwise the onFinalize()
 *  is called before the actual message is consumed.
 */
class DeferredGet : public DeferredExtReceiver, public std::enable_shared_from_this<DeferredGet>
{
private:
    /**
     *  Callback in case the queue is empty
     *  @var    EmptyCallback
     */
    EmptyCallback _emptyCallback;

    /**
     *  Callback with the number of messages still in the queue
     *  @var    CountCallback
     */
    CountCallback _countCallback;

    /**
     *  Report success for a get operation
     *  @param  messagecount    Number of messages left in the queue
     *  @param  deliveryTag     Delivery tag of the message coming in
     *  @param  redelivered     Was the message redelivered?
     */
    virtual const std::shared_ptr<Deferred> &reportSuccess(uint32_t messagecount, uint64_t deliveryTag, bool redelivered) override;

    /**
     *  Report success when queue was empty
     *  @return Deferred
     */
    virtual const std::shared_ptr<Deferred> &reportSuccess() const override;

    /**
     *  Get reference to self to prevent that object falls out of scope
     *  @return std::shared_ptr
     */
    virtual std::shared_ptr<DeferredReceiver> lock() override { return shared_from_this(); }

    /**
     *  Extended implementation of the complete method that is called when a message was fully received
     */
    virtual void complete() override;

    /**
     *  The channel implementation may call our
     *  private members and construct us
     */
    friend class ChannelImpl;
    friend class ConsumedMessage;

public:
    /**
     *  Protected constructor that can only be called
     *  from within the channel implementation
     *
     *  Note: this constructor _should_ be protected, but because make_shared
     *  will then not work, we have decided to make it public after all,
     *  because the work-around would result in not-so-easy-to-read code.
     *
     *  @param  channel     the channel implementation
     *  @param  failed      are we already failed?
     */
    DeferredGet(ChannelImpl *channel, bool failed = false) :
        DeferredExtReceiver(failed, channel) {}

public:
    /**
     *  Register a function to be called when a message arrives
     *  This fuction is also available as onReceived() and onMessage() because I always forget which name I gave to it
     *  @param  callback
     */
    inline DeferredGet &onSuccess(const MessageCallback& callback) { return onSuccess(MessageCallback(callback)); }
    DeferredGet &onSuccess(MessageCallback&& callback)
    {
        // store the callback
        _messageCallback = std::move(callback);

        // allow chaining
        return *this;
    }
    
    /**
     *  Register a function to be called when an error occurs. This should be defined, otherwise the base methods are used.
     *  @param  callback
     */
    inline DeferredGet &onError(const ErrorCallback& callback) { return onError(ErrorCallback(callback)); }
    DeferredGet &onError(ErrorCallback&& callback)
    {
        // store the callback
        _errorCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register a function to be called when a message arrives
     *  This fuction is also available as onSuccess() and onMessage() because I always forget which name I gave to it
     *  @param  callback    the callback to execute
     */
    inline DeferredGet &onReceived(const MessageCallback& callback) { return onReceived(MessageCallback(callback)); }
    DeferredGet &onReceived(MessageCallback&& callback)
    {
        // store callback
        _messageCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register a function to be called when a message arrives
     *  This fuction is also available as onSuccess() and onReceived() because I always forget which name I gave to it
     *  @param  callback    the callback to execute
     */
    inline DeferredGet &onMessage(const MessageCallback& callback) { return onMessage(MessageCallback(callback)); }
    DeferredGet &onMessage(MessageCallback&& callback)
    {
        // store callback
        _messageCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register a function to be called if no message could be fetched
     *  @param  callback    the callback to execute
     */
    inline DeferredGet &onEmpty(const EmptyCallback& callback) { return onEmpty(EmptyCallback(callback)); }
    DeferredGet &onEmpty(EmptyCallback&& callback)
    {
        // store callback
        _emptyCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register a function to be called when queue size information is known
     *  @param  callback    the callback to execute
     */
    inline DeferredGet &onCount(const CountCallback& callback) { return onCount(CountCallback(callback)); }
    DeferredGet &onCount(CountCallback&& callback)
    {
        // store callback
        _countCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register the function to be called when a new message is expected
     *
     *  @param  callback    The callback to invoke
     *  @return Same object for chaining
     */
    inline DeferredGet &onBegin(const StartCallback& callback) { return onBegin(StartCallback(callback)); }
    DeferredGet &onBegin(StartCallback&& callback)
    {
        // store callback
        _startCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register the function to be called when a new message is expected
     *
     *  @param  callback    The callback to invoke
     *  @return Same object for chaining
     */
    inline DeferredGet &onStart(const StartCallback& callback) { return onStart(StartCallback(callback)); }
    DeferredGet &onStart(StartCallback&& callback)
    {
        // store callback
        _startCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register a function that is called when the message size is known
     * 
     *  @param  callback    The callback to invoke for message headers
     *  @return Same object for chaining
     */
    inline DeferredGet &onSize(const SizeCallback& callback) { return onSize(SizeCallback(callback)); }
    DeferredGet &onSize(SizeCallback&& callback)
    {
        // store callback
        _sizeCallback = std::move(callback);
        
        // allow chaining
        return *this;
    }

    /**
     *  Register the function to be called when message headers come in
     *
     *  @param  callback    The callback to invoke for message headers
     *  @return Same object for chaining
     */
    inline DeferredGet &onHeaders(const HeaderCallback& callback) { return onHeaders(HeaderCallback(callback)); }
    DeferredGet &onHeaders(HeaderCallback&& callback)
    {
        // store callback
        _headerCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register the function to be called when a chunk of data comes in
     *
     *  Note that this function may be called zero, one or multiple times
     *  for each incoming message depending on the size of the message data.
     *
     *  If you install this callback you very likely also want to install
     *  the onComplete callback so you know when the last data part was
     *  received.
     *
     *  @param  callback    The callback to invoke for chunks of message data
     *  @return Same object for chaining
     */
    inline DeferredGet &onData(const DataCallback& callback) { return onData(DataCallback(callback)); }
    DeferredGet &onData(DataCallback&& callback)
    {
        // store callback
        _dataCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register a funtion to be called when a message was completely received
     *
     *  @param  callback    The callback to invoke
     *  @return Same object for chaining
     */
    inline DeferredGet &onComplete(const DeliveredCallback& callback) { return onComplete(DeliveredCallback(callback)); }
    DeferredGet &onComplete(DeliveredCallback&& callback)
    {
        // store callback
        _deliveredCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register a funtion to be called when a message was completely received
     *
     *  @param  callback    The callback to invoke
     *  @return Same object for chaining
     */
    inline DeferredGet &onDelivered(const DeliveredCallback& callback) { return onDelivered(DeliveredCallback(callback)); }
    DeferredGet &onDelivered(DeliveredCallback&& callback)
    {
        // store callback
        _deliveredCallback = std::move(callback);

        // allow chaining
        return *this;
    }
};

/**
 *  End of namespace
 */
}