1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
|
/**
* Class describing a mid-level Amqp connection
*
* @copyright 2014 - 2018 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class definition
*/
class Connection
{
private:
/**
* The actual implementation
* @var ConnectionImpl
*/
ConnectionImpl _implementation;
public:
/**
* Construct an AMQP object based on full login data
*
* The first parameter is a handler object. This handler class is
* an interface that should be implemented by the caller.
*
* @param handler Connection handler
* @param login Login data
* @param vhost Vhost to use
*/
Connection(ConnectionHandler *handler, const Login &login, const std::string &vhost) : _implementation(this, handler, login, vhost) {}
/**
* Construct with default vhost
* @param handler Connection handler
* @param login Login data
*/
Connection(ConnectionHandler *handler, const Login &login) : _implementation(this, handler, login, "/") {}
/**
* Construct an AMQP object with default login data and default vhost
* @param handler Connection handler
*/
Connection(ConnectionHandler *handler, const std::string &vhost) : _implementation(this, handler, Login(), vhost) {}
/**
* Construct an AMQP object with default login data and default vhost
* @param handler Connection handler
*/
Connection(ConnectionHandler *handler) : _implementation(this, handler, Login(), "/") {}
/**
* No copy'ing, we do not support having two identical connection objects
* @param connection
*/
Connection(const Connection &connection) = delete;
/**
* Destructor
*/
virtual ~Connection() {}
/**
* No assignments of other connections
* @param connection
* @return Connection
*/
Connection &operator=(const Connection &connection) = delete;
/**
* Retrieve the login data
* @return Login
*/
const Login &login() const
{
return _implementation.login();
}
/**
* Retrieve the vhost
* @return string
*/
const std::string &vhost() const
{
return _implementation.vhost();
}
/**
* Send a ping/heartbeat to the channel to keep it alive
* @return bool
*/
bool heartbeat()
{
return _implementation.heartbeat();
}
/**
* Parse data that was recevied from RabbitMQ
*
* Every time that data comes in from RabbitMQ, you should call this method to parse
* the incoming data, and let it handle by the AMQP library. This method returns the number
* of bytes that were processed.
*
* If not all bytes could be processed because it only contained a partial frame, you should
* call this same method later on when more data is available. The AMQP library does not do
* any buffering, so it is up to the caller to ensure that the old data is also passed in that
* later call.
*
* @param buffer buffer to decode
* @param size size of the buffer to decode
* @return number of bytes that were processed
*/
uint64_t parse(const char *buffer, size_t size)
{
return _implementation.parse(ByteBuffer(buffer, size));
}
/**
* Parse data that was recevied from RabbitMQ
*
* Every time that data comes in from RabbitMQ, you should call this method to parse
* the incoming data, and let it handle by the AMQP library. This method returns the number
* of bytes that were processed.
*
* If not all bytes could be processed because it only contained a partial frame, you should
* call this same method later on when more data is available. The AMQP library does not do
* any buffering, so it is up to the caller to ensure that the old data is also passed in that
* later call.
*
* This method accepts a buffer object. This is an interface that is defined by the AMQP
* library, that can be implemented by you to allow faster access to a buffer.
*
* @param buffer buffer to decode
* @return number of bytes that were processed
*/
uint64_t parse(const Buffer &buffer)
{
return _implementation.parse(buffer);
}
/**
* Report that the connection was lost in the middle of an operation
*
* The AMQP protocol normally has a nice closing handshake, and a connection
* is elegantly closed via calls to the close() and parse() methods. The parse()
* methods recognizes the close-confirmation and will report this to the handler.
* However, if you notice yourself that the connection is lost in the middle of
* an operation (for example due to a crashing RabbitMQ server), you should
* explicitly tell the connection object about it, so that it can cancel all
* pending operations. For all pending operations the error and finalize callbacks
* will be called. The ConnectionHandler::onError() method will however _not_ be
* called.
*
* @param message the message that has to be passed to all error handlers
* @return bool false if the connection already was failed
*/
bool fail(const char *message)
{
return _implementation.fail(message);
}
/**
* Max frame size
*
* If you allocate memory to receive data that you are going to pass to the parse() method,
* it might be useful to have an insight in the max frame size. The parse() method process
* one frame at a time, so you must at least be able to read in buffers of this specific
* frame size.
*
* @return size_t
*/
uint32_t maxFrame() const
{
return _implementation.maxFrame();
}
/**
* Expected number of bytes for the next parse() call.
*
* This method returns the number of bytes that the next call to parse() at least expects to
* do something meaningful with it.
*
* @return size_t
*/
uint32_t expected() const
{
return _implementation.expected();
}
/**
* Is the connection ready to accept instructions / has passed the login handshake and not closed?
* @return bool
*/
bool ready() const
{
return _implementation.ready();
}
/**
* Is (or was) the connection initialized
* @return bool
*/
bool initialized() const
{
return _implementation.initialized();
}
/**
* Is the connection in a usable state, or is it already closed or
* in the process of being closed?
* @return bool
*/
bool usable() const
{
return _implementation.usable();
}
/**
* Close the connection
* This will close all channels
* @return bool
*/
bool close()
{
return _implementation.close();
}
/**
* Retrieve the number of channels that are active for this connection
* @return std::size_t
*/
std::size_t channels() const
{
return _implementation.channels();
}
/**
* Is the connection busy waiting for an answer from the server? (in the
* meantime you can already send more instructions over it)
* @return bool
*/
bool waiting() const
{
return _implementation.waiting();
}
/**
* Some classes have access to private properties
*/
friend class ChannelImpl;
};
/**
* End of namespace
*/
}
|