1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
|
/**
* LibEvent.h
*
* Implementation for the AMQP::TcpHandler that is optimized for libevent. You can
* use this class instead of a AMQP::TcpHandler class, just pass the event loop
* to the constructor and you're all set
*
* Compile with: "g++ -std=c++11 libevent.cpp -lamqpcpp -levent -lpthread"
*
* @author Brent Dimmig <brentdimmig@gmail.com>
*/
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include <event2/event.h>
#include <amqpcpp/flags.h>
#include <amqpcpp/linux_tcp.h>
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class definition
*/
class LibEventHandler : public TcpHandler
{
private:
/**
* Helper class that wraps a libev I/O watcher
*/
class Watcher
{
private:
/**
* The actual event structure
* @var struct event
*/
struct event * _event;
/**
* Callback method that is called by libevent when a filedescriptor becomes active
* @param fd The filedescriptor with an event
* @param what Events triggered
* @param connection_arg void * to the connection
*/
static void callback(evutil_socket_t fd, short what, void *connection_arg)
{
// retrieve the connection
TcpConnection *connection = static_cast<TcpConnection*>(connection_arg);
// setup amqp flags
int amqp_flags = 0;
if (what & EV_READ)
amqp_flags |= AMQP::readable;
if (what & EV_WRITE)
amqp_flags |= AMQP::writable;
// tell the connection that its filedescriptor is active
connection->process(fd, amqp_flags);
}
public:
/**
* Constructor
* @param evbase The current event loop
* @param connection The connection being watched
* @param fd The filedescriptor being watched
* @param events The events that should be monitored
*/
Watcher(struct event_base *evbase, TcpConnection *connection, int fd, int events)
{
// setup libevent flags
short event_flags = EV_PERSIST;
if (events & AMQP::readable)
event_flags |= EV_READ;
if (events & AMQP::writable)
event_flags |= EV_WRITE;
// initialize the event
_event = event_new(evbase, fd, event_flags, callback, connection);
event_add(_event, nullptr);
}
/**
* Destructor
*/
virtual ~Watcher()
{
// stop the event
event_del(_event);
// free the event
event_free(_event);
}
/**
* Change the events for which the filedescriptor is monitored
* @param events
*/
void events(int events)
{
// stop the event if it was active
event_del(_event);
// setup libevent flags
short event_flags = EV_PERSIST;
if (events & AMQP::readable)
event_flags |= EV_READ;
if (events & AMQP::writable)
event_flags |= EV_WRITE;
// set the events
event_assign(_event, event_get_base(_event), event_get_fd(_event), event_flags,
event_get_callback(_event), event_get_callback_arg(_event));
// and restart it
event_add(_event, nullptr);
}
};
/**
* The event loop
* @var struct event_base*
*/
struct event_base *_evbase;
/**
* All I/O watchers that are active, indexed by their filedescriptor
* @var std::map<int,Watcher>
*/
std::map<int,std::unique_ptr<Watcher>> _watchers;
/**
* Method that is called when the heartbeat frequency is negotiated
* @param connection The connection that suggested a heartbeat interval
* @param interval The suggested interval from the server
* @return uint16_t The interval to use
*/
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override
{
// call base (in the highly theoretical case that the base class does something meaningful)
auto response = TcpHandler::onNegotiate(connection, interval);
// because the LibEvHandler has not yet implemented timers for ensuring that we send
// some data every couple of seconds, we disabled timeouts
return 0;
}
/**
* Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
* @param connection The TCP connection object that is reporting
* @param fd The filedescriptor to be monitored
* @param flags Should the object be monitored for readability or writability?
*/
virtual void monitor(TcpConnection *connection, int fd, int flags) override
{
// do we already have this filedescriptor
auto iter = _watchers.find(fd);
// was it found?
if (iter == _watchers.end())
{
// we did not yet have this watcher - but that is ok if no filedescriptor was registered
if (flags == 0) return;
// construct a new watcher, and put it in the map
_watchers[fd] = std::unique_ptr<Watcher>(new Watcher(_evbase, connection, fd, flags));
}
else if (flags == 0)
{
// the watcher does already exist, but we no longer have to watch this watcher
_watchers.erase(iter);
}
else
{
// change the events
iter->second->events(flags);
}
}
public:
/**
* Constructor
* @param evbase The event loop to wrap
*/
LibEventHandler(struct event_base *evbase) : _evbase(evbase) {}
/**
* Destructor
*/
virtual ~LibEventHandler() = default;
};
/**
* End of namespace
*/
}
|