File: throttle.h

package info (click to toggle)
amqp-cpp 4.3.27-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,384 kB
  • sloc: cpp: 10,021; ansic: 191; makefile: 95
file content (149 lines) | stat: -rw-r--r-- 3,439 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/**
 *  Throttle.h
 *  
 *  A channel wrapper that publishes more messages as soon as there is more capacity.
 *  
 *  @author Michael van der Werve <michael.vanderwerve@mailerq.com>
 *  @copyright 2020 Copernica BV
 */

/**
 *  Header guard
 */
#pragma once

/**
 *  Includes
 */
#include <cstdint>
#include <set>
#include <queue>
#include "copiedbuffer.h"
#include "channelimpl.h"
#include "tagger.h"

/**
 *  Begin of namespaces
 */
namespace AMQP {

/**
 *  Forward declarations
 */
class Channel; 

/**
 *  Class definition
 */
class Throttle : public Tagger
{
protected:
    /**
     *  Last sent ID
     *  @var uint64_t
     */
    uint64_t _last = 0;

    /**
     *  Throttle
     *  @var size_t
     */
    size_t _throttle;

    /**
     *  Messages that should still be sent out.
     *  @var    queue
     */
    std::queue<std::pair<uint64_t, CopiedBuffer>> _queue;

    /**
     *  Set of open deliverytags. We want a normal set (not unordered_set) because
     *  removal will be cheaper for whole ranges.
     *  @var size_t
     */
    std::set<size_t> _open;


protected:
    /**
     *  Send method for a frame
     *  @param  id
     *  @param  frame
     */
    virtual bool send(uint64_t id, const Frame &frame) override;

    /**
     *  Method that is called to report an error
     *  @param  message
     */
    virtual void reportError(const char *message) override;

    /**
     *  Method that is called to report an ack/nack
     *  @param  deliveryTag
     *  @param  multiple
     */
    virtual void onAck(uint64_t deliveryTag, bool multiple) override;
    virtual void onNack(uint64_t deliveryTag, bool multiple) override;

public:
    /**
     *  Constructor. Warning: this takes control of the channel, there should be no extra
     *  handlers set on the channel (onError) and no further publishes should be done on the
     *  raw channel either. Doing this will cause the throttle to work incorrectly, as the
     *  counters are not properly updated.
     *  @param  channel 
     *  @param  throttle
     */
    Throttle(Channel &channel, size_t throttle);

    /**
     *  Deleted copy constructor, deleted move constructor
     *  @param other
     */
    Throttle(const Throttle &other) = delete;
    Throttle(Throttle &&other) = delete;

    /**
     *  Deleted copy assignment, deleted move assignment
     *  @param  other
     */
    Throttle &operator=(const Throttle &other) = delete;
    Throttle &operator=(Throttle &&other) = delete;

    /**
     *  Virtual destructor
     */
    virtual ~Throttle() = default;

    /**
     *  Method to check how many messages are still unacked.
     *  @return size_t
     */
    virtual size_t unacknowledged() const override { return _open.size() + (_current - _last - 1); }

    /** 
     *  Get the throttle
     *  @return size_t
     */
    size_t throttle() const { return _throttle; }

    /**
     *  Set a new throttle. Note that this will only gradually take effect when set down, and
     *  the update is picked up on the next acknowledgement.
     *  @param  size_t
     */
    void throttle(size_t throttle) { _throttle = throttle; }

    /**
     *  Flush the throttle. This flushes it _without_ taking the throttle into account, e.g. the messages
     *  are sent in a burst over the channel.
     *  @param  max     optional maximum, 0 is flush all
     */
    size_t flush(size_t max = 0);
};

/**
 *  End of namespaces
 */
}