1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
/**
* DeferredReceiver.h
*
* Base class for the deferred consumer, the deferred get and the
* deferred publisher (that may receive returned messages)
*
* @copyright 2016 - 2018 Copernica B.V.
*/
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include "deferred.h"
#include "stack_ptr.h"
#include "message.h"
/**
* Start namespace
*/
namespace AMQP {
/**
* Forward declarations
*/
class BasicDeliverFrame;
class BasicGetOKFrame;
class BasicHeaderFrame;
class BodyFrame;
/**
* Base class for deferred consumers
*/
class DeferredReceiver : public Deferred
{
private:
/**
* Size of the body of the current message
* @var uint64_t
*/
uint64_t _bodySize = 0;
protected:
/**
* Initialize the object to send out a message
* @param exchange the exchange to which the message was published
* @param routingkey the routing key that was used to publish the message
*/
virtual void initialize(const std::string &exchange, const std::string &routingkey);
/**
* Get reference to self to prevent that object falls out of scope
* @return std::shared_ptr
*/
virtual std::shared_ptr<DeferredReceiver> lock() = 0;
/**
* Indicate that a message was done
*/
virtual void complete() = 0;
private:
/**
* Process the message headers
*
* @param frame The frame to process
*/
void process(BasicHeaderFrame &frame);
/**
* Process the message data
*
* @param frame The frame to process
*/
void process(BodyFrame &frame);
/**
* Frames may be processed
*/
friend class ChannelImpl;
friend class BasicGetOKFrame;
friend class BasicHeaderFrame;
friend class BodyFrame;
protected:
/**
* The channel to which the consumer is linked
* @var ChannelImpl
*/
ChannelImpl *_channel;
/**
* Callback for new message
* @var StartCallback
*/
StartCallback _startCallback;
/**
* Callback that is called when size of the message is known
* @var SizeCallback
*/
SizeCallback _sizeCallback;
/**
* Callback for incoming headers
* @var HeaderCallback
*/
HeaderCallback _headerCallback;
/**
* Callback for when a chunk of data comes in
* @var DataCallback
*/
DataCallback _dataCallback;
/**
* The message that we are currently receiving
* @var stack_ptr<Message>
*/
stack_ptr<Message> _message;
/**
* Constructor
* @param failed Have we already failed?
* @param channel The channel we are consuming on
*/
DeferredReceiver(bool failed, ChannelImpl *channel) :
Deferred(failed), _channel(channel) {}
public:
/**
* Destructor
*/
virtual ~DeferredReceiver() = default;
};
/**
* End namespace
*/
}
|