1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531
|
/**
* LibEV.h
*
* Implementation for the AMQP::TcpHandler that is optimized for libev. You can
* use this class instead of a AMQP::TcpHandler class, just pass the event loop
* to the constructor and you're all set
*
* Compile with: "g++ -std=c++11 libev.cpp -lamqpcpp -lev -lpthread"
*
* @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
* @copyright 2015 - 2023 Copernica BV
*/
/**
* Include guard
*/
#pragma once
/**
* Dependencies
*/
#include <ev.h>
#include <list>
#include "amqpcpp/linux_tcp.h"
/**
* Set up namespace
*/
namespace AMQP {
/**
* Class definition
*/
class LibEvHandler : public TcpHandler
{
private:
/**
* Internal interface for the object that is being watched
*/
class Watchable
{
public:
/**
* The method that is called when a filedescriptor becomes active
* @param fd
* @param events
*/
virtual void onActive(int fd, int events) = 0;
/**
* Method that is called when the timer expires
*/
virtual void onExpired() = 0;
};
/**
* Helper class that wraps a libev I/O watcher
*/
class Watcher
{
private:
/**
* The event loop to which it is attached
* @var struct ev_loop
*/
struct ev_loop *_loop;
/**
* The actual watcher structure
* @var struct ev_io
*/
struct ev_io _io;
/**
* Callback method that is called by libev when a filedescriptor becomes active
* @param loop The loop in which the event was triggered
* @param w Internal watcher object
* @param revents Events triggered
*/
static void callback(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
// retrieve the watched object
Watchable *object = static_cast<Watchable*>(watcher->data);
// tell the object that its filedescriptor is active
object->onActive(watcher->fd, revents);
}
public:
/**
* Constructor
* @param loop The current event loop
* @param object The object being watched
* @param fd The filedescriptor being watched
* @param events The events that should be monitored
* @param priority The priority for the watcher
*/
Watcher(struct ev_loop *loop, Watchable *object, int fd, int events, int priority) : _loop(loop)
{
// initialize the libev structure
ev_io_init(&_io, callback, fd, events);
// install a priority
ev_set_priority(&_io, priority);
// store the object in the data "void*"
_io.data = object;
// start the watcher
ev_io_start(_loop, &_io);
}
/**
* Watchers cannot be copied or moved
*
* @param that The object to not move or copy
*/
Watcher(Watcher &&that) = delete;
Watcher(const Watcher &that) = delete;
/**
* Destructor
*/
virtual ~Watcher()
{
// stop the watcher
ev_io_stop(_loop, &_io);
}
/**
* Check if a filedescriptor is covered by the watcher
* @param fd
* @return bool
*/
bool contains(int fd) const { return _io.fd == fd; }
/**
* Change the events for which the filedescriptor is monitored
* @param events
*/
void events(int events)
{
// stop the watcher if it was active
ev_io_stop(_loop, &_io);
// set the events
ev_io_set(&_io, _io.fd, events);
// and restart it
ev_io_start(_loop, &_io);
}
};
/**
* Wrapper around a connection, this will monitor the filedescriptors
* and run a timer to send out heartbeats
*/
class Wrapper : private Watchable
{
private:
/**
* The connection that is monitored
* @var TcpConnection
*/
TcpConnection *_connection;
/**
* The event loop to which it is attached
* @var struct ev_loop
*/
struct ev_loop *_loop;
/**
* The watcher for the timer
* @var struct ev_io
*/
struct ev_timer _timer;
/**
* IO-watchers to monitor filedescriptors
* @var std::list
*/
std::list<Watcher> _watchers;
/**
* When should we send the next heartbeat?
* @var ev_tstamp
*/
ev_tstamp _next = 0.0;
/**
* When does the connection expire / was the server for a too longer period of time idle?
* During connection setup, this member is used as the connect-timeout.
* @var ev_tstamp
*/
ev_tstamp _expire;
/**
* Timeout after which the connection is no longer considered alive.
* A heartbeat must be sent every _timeout / 2 seconds.
* Value zero means heartbeats are disabled, or not yet negotiated.
* @var uint16_t
*/
uint16_t _timeout = 0;
/**
* Callback method that is called by libev when the timer expires
* @param loop The loop in which the event was triggered
* @param timer Internal timer object
* @param revents The events that triggered this call
*/
static void callback(struct ev_loop *loop, struct ev_timer *timer, int revents)
{
// retrieve the object
Watchable *object = static_cast<Watchable*>(timer->data);
// tell the object that it expired
object->onExpired();
}
/**
* Do we need timers / is this a timed monitor?
* @return bool
*/
bool timed() const
{
// if neither timers are set
return _expire > 0.0 || _next > 0.0;
}
/**
* Method that is called when the timer expired
*/
virtual void onExpired() override
{
// get the current time
ev_tstamp now = ev_now(_loop);
// timer is no longer active, so the refcounter in the loop is restored
ev_ref(_loop);
// if the onNegotiate method was not yet called, and no heartbeat timeout was negotiated
if (_timeout == 0)
{
// this can happen in three situations: 1. a connect-timeout, 2. user space has
// told us that we're not interested in heartbeats, 3. rabbitmq does not want heartbeats,
// in either case we're no longer going to run further timers.
_next = _expire = 0.0;
// if we have an initialized connection, user-space must have overridden the onNegotiate
// method, so we keep using the connection
if (_connection->initialized()) return;
// this is a connection timeout, close the connection from our side too
return (void)_connection->close(true);
}
else if (now >= _expire)
{
// the server was inactive for a too long period of time, reset state
_next = _expire = 0.0; _timeout = 0;
// close the connection because server was inactive (we close it with immediate effect,
// because it was inactive so we cannot trust it to respect the AMQP close handshake)
return (void)_connection->close(true);
}
else if (now >= _next)
{
// it's time for the next heartbeat
_connection->heartbeat();
// remember when we should send out the next one, so the next one should be
// sent only after _timout/2 seconds again _from now_ (no catching up)
_next = now + std::max(_timeout / 2, 1);
}
// reset the timer to trigger again later
ev_timer_set(&_timer, std::min(_next, _expire) - now, 0.0);
// and start it again
ev_timer_start(_loop, &_timer);
// and because the timer is running again, we restore the refcounter
ev_unref(_loop);
}
/**
* Method that is called when a filedescriptor becomes active
* @param fd the filedescriptor that is active
* @param events the events that are active (readable/writable)
*/
virtual void onActive(int fd, int events) override
{
// if the server is readable, we have some extra time before it expires, the expire time
// is set to 1.5 * _timeout to close the connection when the third heartbeat is about to be sent
if (_timeout != 0 && (events & EV_READ)) _expire = ev_now(_loop) + _timeout * 1.5;
// pass on to the connection
_connection->process(fd, events);
}
public:
/**
* Constructor
* @param loop The current event loop
* @param connection The TCP connection
* @param timeout Connect timeout
* @param priority The priority (high priorities are invoked earlier
*/
Wrapper(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout, int priority) :
_connection(connection),
_loop(loop),
_next(0.0),
_expire(ev_now(loop) + timeout),
_timeout(0)
{
// store the object in the data "void*"
_timer.data = this;
// initialize the libev structure, it should expire after the connection timeout
ev_timer_init(&_timer, callback, timeout, 0.0);
// set a priority
ev_set_priority(&_timer, priority);
// start the timer (this is the time that we reserve for setting up the connection)
ev_timer_start(_loop, &_timer);
// the timer should not keep the event loop active
ev_unref(_loop);
}
/**
* Watchers cannot be copied or moved
*
* @param that The object to not move or copy
*/
Wrapper(Wrapper &&that) = delete;
Wrapper(const Wrapper &that) = delete;
/**
* Destructor
*/
virtual ~Wrapper()
{
// the timer was already stopped
if (!timed()) return;
// stop the timer
ev_timer_stop(_loop, &_timer);
// restore loop refcount
ev_ref(_loop);
}
/**
* Start the timer (and expose the interval)
* @param interval the heartbeat interval proposed by the server
* @return uint16_t the heartbeat interval that we accepted
*/
uint16_t start(uint16_t timeout)
{
// we now know for sure that the connection was set up
_timeout = timeout;
// if heartbeats are disabled we do not have to set it
if (_timeout == 0) return 0;
// calculate current time
auto now = ev_now(_loop);
// we also know when the next heartbeat should be sent
_next = now + std::max(_timeout / 2, 1);
// because the server has just sent us some data, we will update the expire time too
_expire = now + _timeout * 1.5;
// stop the existing timer (we have to stop it and restart it, because ev_timer_set()
// on its own does not change the running timer) (note that we assume that the timer
// is already running and keeps on running, so no calls to ev_ref()/en_unref() here)
ev_timer_stop(_loop, &_timer);
// find the earliest thing that expires
ev_timer_set(&_timer, std::min(_next, _expire) - now, 0.0);
// and start it again
ev_timer_start(_loop, &_timer);
// expose the accepted interval
return _timeout;
}
/**
* Check if the timer is associated with a certain connection
* @param connection
* @return bool
*/
bool contains(const AMQP::TcpConnection *connection) const
{
// compare the connections
return _connection == connection;
}
/**
* Monitor a filedescriptor
* @param fd
* @param events
*/
void monitor(int fd, int events)
{
// should we remove?
if (events == 0)
{
// remove the io-watcher
_watchers.remove_if([fd](const Watcher &watcher) -> bool {
// compare filedescriptors
return watcher.contains(fd);
});
}
else
{
// look in the array for this filedescriptor
for (auto &watcher : _watchers)
{
// do we have a match?
if (watcher.contains(fd)) return watcher.events(events);
}
// we need a watcher
Watchable *watchable = this;
// we should monitor a new filedescriptor
_watchers.emplace_back(_loop, watchable, fd, events, ev_priority(&_timer));
}
}
};
/**
* The event loop
* @var struct ev_loop*
*/
struct ev_loop *_loop;
/**
* Each connection is wrapped
* @var std::list
*/
std::list<Wrapper> _wrappers;
/**
* The priority that watchers should have (higher prio means libev gives more prio to this eveht)
* @var int
*/
int _priority;
/**
* Lookup a connection-wrapper, when the wrapper is not found, we construct one
* @param connection
* @return Wrapper
*/
Wrapper &lookup(TcpConnection *connection)
{
// look for the appropriate connection
for (auto &wrapper : _wrappers)
{
// do we have a match?
if (wrapper.contains(connection)) return wrapper;
}
// add to the wrappers
_wrappers.emplace_back(_loop, connection, 60, _priority);
// done
return _wrappers.back();
}
protected:
/**
* Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
* @param connection The TCP connection object that is reporting
* @param fd The filedescriptor to be monitored
* @param flags Should the object be monitored for readability or writability?
*/
virtual void monitor(TcpConnection *connection, int fd, int flags) override
{
// lookup the appropriate wrapper and start monitoring
lookup(connection).monitor(fd, flags);
}
/**
* Method that is called when the heartbeat timeout is negotiated between the server and the client.
* @param connection The connection that suggested a heartbeat timeout
* @param timeout The suggested timeout from the server
* @return uint16_t The timeout to use
*/
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t timeout) override
{
// lookup the wrapper, and start the timer to check for activity and send heartbeats
return lookup(connection).start(timeout);
}
/**
* Method that is called when the TCP connection is destructed
* @param connection The TCP connection
*/
virtual void onDetached(TcpConnection *connection) override
{
// remove from the array
_wrappers.remove_if([connection](const Wrapper &wrapper) -> bool {
return wrapper.contains(connection);
});
}
public:
/**
* Constructor
* @param loop The event loop to wrap
* @param priority The libev priority (higher priorities are invoked earlier)
*/
LibEvHandler(struct ev_loop *loop, int priority = 0) : _loop(loop), _priority(priority) {}
/**
* Destructor
*/
virtual ~LibEvHandler() = default;
};
/**
* End of namespace
*/
}
|