File: deferredrecall.h

package info (click to toggle)
amqp-cpp 4.3.27-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,384 kB
  • sloc: cpp: 10,021; ansic: 191; makefile: 95
file content (249 lines) | stat: -rw-r--r-- 6,967 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
/**
 *  DeferredRecall.h
 * 
 *  Class that an be used to install callback methods that define how 
 *  returned messages should be handled.
 * 
 *  @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
 *  @copyright 2018 - 2020 Copernica BV
 */

/**
 *  Include guard
 */
#pragma once

/**
 *  Begin of namespace
 */
namespace AMQP {
    
/**
 *  Forward declarations
 */
class ChannelImpl;

/**
 *  Class definition
 */
class DeferredRecall : public DeferredReceiver, public std::enable_shared_from_this<DeferredRecall>
{
private:
    /**
     *  The error code
     *  @var int16_t
     */
    int16_t _code = 0;
    
    /**
     *  The error message
     *  @var std::string
     */
    std::string _description;

    /**
     *  Callback that is called when a message is returned
     *  @var BounceCallback
     */
    BounceCallback _bounceCallback;

    /**
     *  Begin of a bounced message
     *  @var ReturnCallback
     */
    ReturnCallback _beginCallback;
    
    /**
     *  End of a bounced message
     *  @var ReturnedCallback
     */
    ReturnedCallback _completeCallback;

    /**
     *  Process a return frame
     *
     *  @param  frame   The frame to process
     */
    void process(BasicReturnFrame &frame);

    /**
     *  Get reference to self to prevent that object falls out of scope
     *  @return std::shared_ptr
     */
    virtual std::shared_ptr<DeferredReceiver> lock() override { return shared_from_this(); }

    /**
     *  Extended implementation of the complete method that is called when a message was fully received
     */
    virtual void complete() override;
    
    /**
     *  Classes that can access private members
     */
    friend class BasicReturnFrame;

public:
    /**
     *  Constructor that should only be called from within the channel implementation
     *
     *  Note: this constructor _should_ be protected, but because make_shared
     *  will then not work, we have decided to make it public after all,
     *  because the work-around would result in not-so-easy-to-read code.
     *
     *  @param  channel     the channel implementation
     *  @param  failed      are we already failed?
     */
    DeferredRecall(ChannelImpl *channel, bool failed = false) :
        DeferredReceiver(failed, channel) {}
        
public:
    /**
     *  Register a function to be called when a full message is returned
     *  @param  callback    the callback to execute
     */
    inline DeferredRecall &onReceived(const BounceCallback& callback) { return onReceived(BounceCallback(callback)); }
    DeferredRecall &onReceived(BounceCallback&& callback)
    {
        // store callback
        _bounceCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Alias for onReceived() (see above)
     *  @param  callback    the callback to execute
     */
    inline DeferredRecall &onMessage(const BounceCallback& callback) { return onMessage(BounceCallback(callback)); }
    DeferredRecall &onMessage(BounceCallback&& callback)
    {
        // store callback
        _bounceCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Alias for onReceived() (see above)
     *  @param  callback    the callback to execute
     */
    inline DeferredRecall &onReturned(const BounceCallback& callback) { return onReturned(BounceCallback(callback)); }
    DeferredRecall &onReturned(BounceCallback&& callback)
    {
        // store callback
        _bounceCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Alias for onReceived() (see above)
     *  @param  callback    the callback to execute
     */
    inline DeferredRecall &onBounced(const BounceCallback& callback) { return onBounced(BounceCallback(callback)); }
    DeferredRecall &onBounced(BounceCallback&& callback)
    {
        // store callback
        _bounceCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register the function that is called when the start frame of a new 
     *  consumed message is received
     *
     *  @param  callback    The callback to invoke
     *  @return Same object for chaining
     */
    inline DeferredRecall &onBegin(const ReturnCallback& callback) { return onBegin(ReturnCallback(callback)); }
    DeferredRecall &onBegin(ReturnCallback&& callback)
    {
        // store callback
        _beginCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register a function that is called when the message size is known
     * 
     *  @param  callback    The callback to invoke for message headers
     *  @return Same object for chaining
     */
    inline DeferredRecall &onSize(const SizeCallback& callback) { return onSize(SizeCallback(callback)); }
    DeferredRecall &onSize(SizeCallback&& callback)
    {
        // store callback
        _sizeCallback = std::move(callback);
        
        // allow chaining
        return *this;
    }

    /**
     *  Register the function that is called when message headers come in
     *
     *  @param  callback    The callback to invoke for message headers
     *  @return Same object for chaining
     */
    inline DeferredRecall &onHeaders(const HeaderCallback& callback) { return onHeaders(HeaderCallback(callback)); }
    DeferredRecall &onHeaders(HeaderCallback&& callback)
    {
        // store callback
        _headerCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register the function to be called when a chunk of data comes in
     *
     *  Note that this function may be called zero, one or multiple times
     *  for each incoming message depending on the size of the message data.
     *
     *  If you install this callback you very likely also want to install
     *  the onComplete callback so you know when the last data part was
     *  received.
     *
     *  @param  callback    The callback to invoke for chunks of message data
     *  @return Same object for chaining
     */
    inline DeferredRecall &onData(const DataCallback& callback) { return onData(DataCallback(callback)); }
    DeferredRecall &onData(DataCallback&& callback)
    {
        // store callback
        _dataCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Register a funtion to be called when a message was completely received
     *
     *  @param  callback    The callback to invoke
     *  @return Same object for chaining
     */
    inline DeferredRecall &onComplete(const ReturnedCallback& callback) { return onComplete(ReturnedCallback(callback)); }
    DeferredRecall &onComplete(ReturnedCallback&& callback)
    {
        // store callback
        _completeCallback = std::move(callback);

        // allow chaining
        return *this;
    }
};
    
/**
 *  End of namespace
 */
}