File: deferredreceiver.h

package info (click to toggle)
amqp-cpp 4.3.27-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,384 kB
  • sloc: cpp: 10,021; ansic: 191; makefile: 95
file content (145 lines) | stat: -rw-r--r-- 2,962 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/**
 *  DeferredReceiver.h
 *
 *  Base class for the deferred consumer, the deferred get and the
 *  deferred publisher (that may receive returned messages)
 *
 *  @copyright 2016 - 2018 Copernica B.V.
 */

/**
 *  Include guard
 */
#pragma once

/**
 *  Dependencies
 */
#include "deferred.h"
#include "stack_ptr.h"
#include "message.h"

/**
 *  Start namespace
 */
namespace AMQP {

/**
 *  Forward declarations
 */
class BasicDeliverFrame;
class BasicGetOKFrame;
class BasicHeaderFrame;
class BodyFrame;

/**
 *  Base class for deferred consumers
 */
class DeferredReceiver : public Deferred
{
private:
    /**
     *  Size of the body of the current message
     *  @var    uint64_t
     */
    uint64_t _bodySize = 0;


protected:
    /**
     *  Initialize the object to send out a message
     *  @param  exchange            the exchange to which the message was published
     *  @param  routingkey          the routing key that was used to publish the message
     */
    virtual void initialize(const std::string &exchange, const std::string &routingkey);
    
    /**
     *  Get reference to self to prevent that object falls out of scope
     *  @return std::shared_ptr
     */
    virtual std::shared_ptr<DeferredReceiver> lock() = 0;
    
    /**
     *  Indicate that a message was done
     */
    virtual void complete() = 0;

private:
    /**
     *  Process the message headers
     *
     *  @param  frame   The frame to process
     */
    void process(BasicHeaderFrame &frame);

    /**
     *  Process the message data
     *
     *  @param  frame   The frame to process
     */
    void process(BodyFrame &frame);

    /**
     *  Frames may be processed
     */
    friend class ChannelImpl;
    friend class BasicGetOKFrame;
    friend class BasicHeaderFrame;
    friend class BodyFrame;

protected:
    /**
     *  The channel to which the consumer is linked
     *  @var    ChannelImpl
     */
    ChannelImpl *_channel;

    /**
     *  Callback for new message
     *  @var    StartCallback
     */
    StartCallback _startCallback;

    /**
     *  Callback that is called when size of the message is known
     *  @var    SizeCallback
     */
    SizeCallback _sizeCallback;

    /**
     *  Callback for incoming headers
     *  @var    HeaderCallback
     */
    HeaderCallback _headerCallback;

    /**
     *  Callback for when a chunk of data comes in
     *  @var    DataCallback
     */
    DataCallback _dataCallback;

    /**
     *  The message that we are currently receiving
     *  @var    stack_ptr<Message>
     */
    stack_ptr<Message> _message;

    /**
     *  Constructor
     *  @param  failed  Have we already failed?
     *  @param  channel The channel we are consuming on
     */
    DeferredReceiver(bool failed, ChannelImpl *channel) : 
        Deferred(failed), _channel(channel) {}

public:
    /**
     *  Destructor
     */
    virtual ~DeferredReceiver() = default;
};

/**
 *  End namespace
 */
}