File: tagger.h

package info (click to toggle)
amqp-cpp 4.3.27-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,384 kB
  • sloc: cpp: 10,021; ansic: 191; makefile: 95
file content (153 lines) | stat: -rw-r--r-- 4,381 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/**
 *  Tagger.h
 *  
 *  Base class that enables publisher confirms and keeps track of the sent 
 *  messages. You can wrap this class around a AMQP::Channel object and use
 *  this object for publishing instead. This is a base class that you cannot
 *  use directly. You should instead use:
 * 
 *  - Throttle: to throttle traffic to prevent flooding RabbitMQ
 *  - Reliable<Tagger>: to be notified about publish-confirms via callbacks
 *  - Reliable<Throttle>: to have throttle + notifications via callbacks
 *  
 *  @author Michael van der Werve <michael.vanderwerve@mailerq.com>
 *  @copyright 2020 - 2023 Copernica BV
 */

/**
 *  Header guard
 */
#pragma once

/**
 *  Includes
 */
#include "deferredpublish.h"
#include <memory>

/**
 *  Begin of namespaces
 */
namespace AMQP { 

/**
 *  Class definition
 */
class Tagger : public Watchable
{
protected:
    /**
     *  The implementation for the channel
     *  @var    std::shared_ptr<ChannelImpl>
     */
    std::shared_ptr<ChannelImpl> _implementation;

    /**
     *  Current id, always starts at 1.
     *  @var uint64_t
     */
    uint64_t _current = 1;

    /**
     *  Deferred to set up on the close
     *  @var std::shared_ptr<Deferred>
     */
    std::shared_ptr<Deferred> _close;

    /**
     *  Callback to call when an error occurred
     *  @var ErrorCallback
     */
    ErrorCallback _errorCallback;


protected:
    /**
     *  Send method for a frame
     *  @param  id
     *  @param  frame
     */
    virtual bool send(uint64_t id, const Frame &frame);

    /**
     *  Method that is called to report an error.
     *  @param  message
     */
    virtual void reportError(const char *message);

    /**
     *  Method that gets called on ack/nack. If these methods are overridden, make sure 
     *  to also call the base class methods.
     *  @param  deliveryTag
     *  @param  multiple
     */
    virtual void onAck(uint64_t deliveryTag, bool multiple);
    virtual void onNack(uint64_t deliveryTag, bool multiple);

public:
    /**
     *  Constructor
     *  @param  channel
     */
    Tagger(AMQP::Channel &channel);

    /**
     *  Deleted copy constructor, deleted move constructor
     *  @param other
     */
    Tagger(const Tagger &other) = delete;
    Tagger(Tagger &&other) = delete;

    /**
     *  Deleted copy assignment, deleted move assignment
     *  @param  other
     */
    Tagger &operator=(const Tagger &other) = delete;
    Tagger &operator=(Tagger &&other) = delete;

    /**
     *  Virtual destructor
     */
    virtual ~Tagger();

    /**
     *  Method to check how many messages are still unacked.
     *  @return size_t
     */
    virtual size_t unacknowledged() const { return 0; }

    /**
     *  Publish a message to an exchange. See amqpcpp/channel.h for more details on the flags. 
     *  Delays actual publishing depending on the publisher confirms sent by RabbitMQ.
     * 
     *  @param  exchange    the exchange to publish to
     *  @param  routingkey  the routing key
     *  @param  envelope    the full envelope to send
     *  @param  message     the message to send
     *  @param  size        size of the message
     *  @param  flags       optional flags
     *  @return uint64_t
     */
    uint64_t publish(const std::string_view &exchange, const std::string_view &routingKey, const Envelope &envelope, int flags = 0);
    uint64_t publish(const std::string_view &exchange, const std::string_view &routingKey, const std::string_view &message, int flags = 0) { return publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); }
    uint64_t publish(const std::string_view &exchange, const std::string_view &routingKey, const char *message, size_t size, int flags = 0) { return publish(exchange, routingKey, Envelope(message, size), flags); }
    uint64_t publish(const std::string_view &exchange, const std::string_view &routingKey, const char *message, int flags = 0) { return publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }

    /**
     *  Close underlying channel
     *  @return Deferred&
     */
    Deferred &close();

    /**
     *  Install an error callback
     *  @param  callback
     */
    inline void onError(const ErrorCallback& callback) { return onError(ErrorCallback(callback)); }
    void onError(ErrorCallback&& callback);
};

/**
 *  End of namespaces
 */
}