File: deferredpublish.h

package info (click to toggle)
amqp-cpp 4.3.27-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,384 kB
  • sloc: cpp: 10,021; ansic: 191; makefile: 95
file content (150 lines) | stat: -rw-r--r-- 3,883 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/**
 *  DeferredPublish.h
 *
 *  Deferred callback for RabbitMQ-specific publisher confirms mechanism per-message.
 *
 *  @author Michael van der Werve <michael.vanderwerve@mailerq.com>
 *  @copyright 2020 Copernica BV
 */

/**
 *  Include guard
 */
#pragma once

/**
 *  Set up namespace
 */
namespace AMQP {

/**
 *  We extend from the default deferred and add extra functionality
 */
class DeferredPublish : public Deferred
{
private:
    /**
     *  Callback to execute when server confirms that message is processed
     *  @var    AckCallback
     */
    PublishAckCallback _ackCallback;

    /**
     * Callback to execute when server sends negative acknowledgement
     * @var     NackCallback
     */
    PublishNackCallback _nackCallback;

    /**
     *  Callback to execute when message is lost (nack / error)
     *  @var    LostCallback
     */
    PublishLostCallback _lostCallback;

    /**
     *  Report an ack, calls the callback.
     */
    void reportAck() 
    {
        // check if the callback is set 
        if (_ackCallback) _ackCallback(); 
    }

    /**
     *  Report an nack, calls the callback if set.
     */
    void reportNack() 
    {
        // check if the callback is set 
        if (_nackCallback) _nackCallback(); 

        // message is 'lost'
        if (_lostCallback) _lostCallback();
    }

    /**
     *  Indicate failure
     *  @param  error           Description of the error that occured
     */
    void reportError(const char *error)
    {
        // from this moment on the object should be listed as failed
        _failed = true;

        // message is lost
        if (_lostCallback) _lostCallback();

        // execute callbacks if registered
        if (_errorCallback) _errorCallback(error);
    }

    /**
     *  The wrapped confirmed channel implementation may call our
     *  private members and construct us
     */
    template <class T>
    friend class Reliable;


public:
    /**
     *  Protected constructor that can only be called
     *  from within the channel implementation
     *
     *  Note: this constructor _should_ be protected, but because make_shared
     *  will then not work, we have decided to make it public after all,
     *  because the work-around would result in not-so-easy-to-read code.
     *
     *  @param  boolean     are we already failed?
     */
    DeferredPublish(bool failed = false) : Deferred(failed) {}

public:
    /**
     *  Callback that is called when the broker confirmed message publication
     *  @param  callback    the callback to execute
     */
    inline DeferredPublish &onAck(const PublishAckCallback& callback) { return onAck(PublishAckCallback(callback)); }
    DeferredPublish &onAck(PublishAckCallback&& callback)
    {
        // store callback
        _ackCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Callback that is called when the broker denied message publication
     *  @param  callback    the callback to execute
     */
    inline DeferredPublish &onNack(const PublishNackCallback& callback) { return onNack(PublishNackCallback(callback)); }
    DeferredPublish &onNack(PublishNackCallback&& callback)
    {
        // store callback
        _nackCallback = std::move(callback);

        // allow chaining
        return *this;
    }

    /**
     *  Callback that is called when a message is lost, either through RabbitMQ
     *  rejecting it or because of a channel error
     *  @param  callback    the callback to execute
     */
    inline DeferredPublish &onLost(const PublishLostCallback& callback) { return onLost(PublishLostCallback(callback)); }
    DeferredPublish &onLost(PublishLostCallback&& callback)
    {
        // store callback
        _lostCallback = std::move(callback);

        // allow chaining
        return *this;
    }
};

/**
 *  End namespace
 */
}