File: libev.h

package info (click to toggle)
amqp-cpp 4.3.27-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,384 kB
  • sloc: cpp: 10,021; ansic: 191; makefile: 95
file content (531 lines) | stat: -rw-r--r-- 16,976 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
/**
 *  LibEV.h
 *
 *  Implementation for the AMQP::TcpHandler that is optimized for libev. You can
 *  use this class instead of a AMQP::TcpHandler class, just pass the event loop
 *  to the constructor and you're all set
 *
 *  Compile with: "g++ -std=c++11 libev.cpp -lamqpcpp -lev -lpthread"
 *
 *  @author Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
 *  @copyright 2015 - 2023 Copernica BV
 */

/**
 *  Include guard
 */
#pragma once

/**
 *  Dependencies
 */
#include <ev.h>
#include <list>
#include "amqpcpp/linux_tcp.h"

/**
 *  Set up namespace
 */
namespace AMQP {

/**
 *  Class definition
 */
class LibEvHandler : public TcpHandler
{
private:
    /**
     *  Internal interface for the object that is being watched
     */
    class Watchable
    {
    public:
        /**
         *  The method that is called when a filedescriptor becomes active
         *  @param  fd
         *  @param  events
         */
        virtual void onActive(int fd, int events) = 0;
        
        /**
         *  Method that is called when the timer expires
         */
        virtual void onExpired() = 0;
    };

    /**
     *  Helper class that wraps a libev I/O watcher
     */
    class Watcher
    {
    private:
        /**
         *  The event loop to which it is attached
         *  @var struct ev_loop
         */
        struct ev_loop *_loop;

        /**
         *  The actual watcher structure
         *  @var struct ev_io
         */
        struct ev_io _io;

        /**
         *  Callback method that is called by libev when a filedescriptor becomes active
         *  @param  loop        The loop in which the event was triggered
         *  @param  w           Internal watcher object
         *  @param  revents     Events triggered
         */
        static void callback(struct ev_loop *loop, struct ev_io *watcher, int revents)
        {
            // retrieve the watched object
            Watchable *object = static_cast<Watchable*>(watcher->data);

            // tell the object that its filedescriptor is active
            object->onActive(watcher->fd, revents);
        }

    public:
        /**
         *  Constructor
         *  @param  loop            The current event loop
         *  @param  object          The object being watched
         *  @param  fd              The filedescriptor being watched
         *  @param  events          The events that should be monitored
         *  @param  priority        The priority for the watcher
         */
        Watcher(struct ev_loop *loop, Watchable *object, int fd, int events, int priority) : _loop(loop)
        {
            // initialize the libev structure
            ev_io_init(&_io, callback, fd, events);
            
            // install a priority
            ev_set_priority(&_io, priority);

            // store the object in the data "void*"
            _io.data = object;

            // start the watcher
            ev_io_start(_loop, &_io);
        }

        /**
         *  Watchers cannot be copied or moved
         *
         *  @param  that    The object to not move or copy
         */
        Watcher(Watcher &&that) = delete;
        Watcher(const Watcher &that) = delete;

        /**
         *  Destructor
         */
        virtual ~Watcher()
        {
            // stop the watcher
            ev_io_stop(_loop, &_io);
        }
        
        /**
         *  Check if a filedescriptor is covered by the watcher
         *  @param  fd
         *  @return bool
         */
        bool contains(int fd) const { return _io.fd == fd; }

        /**
         *  Change the events for which the filedescriptor is monitored
         *  @param  events
         */
        void events(int events)
        {
            // stop the watcher if it was active
            ev_io_stop(_loop, &_io);

            // set the events
            ev_io_set(&_io, _io.fd, events);

            // and restart it
            ev_io_start(_loop, &_io);
        }
    };

    /**
     *  Wrapper around a connection, this will monitor the filedescriptors
     *  and run a timer to send out heartbeats
     */
    class Wrapper : private Watchable
    {
    private:
        /**
         *  The connection that is monitored
         *  @var TcpConnection
         */
        TcpConnection *_connection;
    
        /**
         *  The event loop to which it is attached
         *  @var struct ev_loop
         */
        struct ev_loop *_loop;

        /**
         *  The watcher for the timer
         *  @var struct ev_io
         */
        struct ev_timer _timer;
        
        /**
         *  IO-watchers to monitor filedescriptors
         *  @var std::list
         */
        std::list<Watcher> _watchers;
        
        /**
         *  When should we send the next heartbeat?
         *  @var ev_tstamp
         */
        ev_tstamp _next = 0.0;
        
        /**
         *  When does the connection expire / was the server for a too longer period of time idle?
         *  During connection setup, this member is used as the connect-timeout.
         *  @var ev_tstamp
         */
        ev_tstamp _expire;
        
        /**
         *  Timeout after which the connection is no longer considered alive.
         *  A heartbeat must be sent every _timeout / 2 seconds.
         *  Value zero means heartbeats are disabled, or not yet negotiated.
         *  @var uint16_t
         */
        uint16_t _timeout = 0;

        /**
         *  Callback method that is called by libev when the timer expires
         *  @param  loop        The loop in which the event was triggered
         *  @param  timer       Internal timer object
         *  @param  revents     The events that triggered this call
         */
        static void callback(struct ev_loop *loop, struct ev_timer *timer, int revents)
        {
            // retrieve the object
            Watchable *object = static_cast<Watchable*>(timer->data);

            // tell the object that it expired
            object->onExpired();
        }
        
        /**
         *  Do we need timers / is this a timed monitor?
         *  @return bool
         */
        bool timed() const
        {
            // if neither timers are set
            return _expire > 0.0 || _next > 0.0;
        }
        
        /**
         *  Method that is called when the timer expired
         */
        virtual void onExpired() override
        {
            // get the current time
            ev_tstamp now = ev_now(_loop);
            
            // timer is no longer active, so the refcounter in the loop is restored
            ev_ref(_loop);

            // if the onNegotiate method was not yet called, and no heartbeat timeout was negotiated
            if (_timeout == 0)
            {
                // this can happen in three situations: 1. a connect-timeout, 2. user space has
                // told us that we're not interested in heartbeats, 3. rabbitmq does not want heartbeats,
                // in either case we're no longer going to run further timers.
                _next = _expire = 0.0;
                
                // if we have an initialized connection, user-space must have overridden the onNegotiate
                // method, so we keep using the connection
                if (_connection->initialized()) return;

                // this is a connection timeout, close the connection from our side too
                return (void)_connection->close(true);
            }
            else if (now >= _expire)
            {
                // the server was inactive for a too long period of time, reset state
                _next = _expire = 0.0; _timeout = 0;
                
                // close the connection because server was inactive (we close it with immediate effect,
                // because it was inactive so we cannot trust it to respect the AMQP close handshake)
                return (void)_connection->close(true);
            }
            else if (now >= _next)
            {
                // it's time for the next heartbeat
                _connection->heartbeat();
                
                // remember when we should send out the next one, so the next one should be 
                // sent only after _timout/2 seconds again _from now_ (no catching up)
                _next = now + std::max(_timeout / 2, 1);
            }

            // reset the timer to trigger again later
            ev_timer_set(&_timer, std::min(_next, _expire) - now, 0.0);

            // and start it again
            ev_timer_start(_loop, &_timer);
            
            // and because the timer is running again, we restore the refcounter
            ev_unref(_loop);
        }
        
        /**
         *  Method that is called when a filedescriptor becomes active
         *  @param  fd          the filedescriptor that is active
         *  @param  events      the events that are active (readable/writable)
         */
        virtual void onActive(int fd, int events) override
        {
            // if the server is readable, we have some extra time before it expires, the expire time 
            // is set to 1.5 * _timeout to close the connection when the third heartbeat is about to be sent
            if (_timeout != 0 && (events & EV_READ)) _expire = ev_now(_loop) + _timeout * 1.5;
            
            // pass on to the connection
            _connection->process(fd, events);
        }

    public:
        /**
         *  Constructor
         *  @param  loop            The current event loop
         *  @param  connection      The TCP connection
         *  @param  timeout         Connect timeout
         *  @param  priority        The priority (high priorities are invoked earlier
         */
        Wrapper(struct ev_loop *loop, AMQP::TcpConnection *connection, uint16_t timeout, int priority) : 
            _connection(connection),
            _loop(loop),
            _next(0.0),
            _expire(ev_now(loop) + timeout),
            _timeout(0)
        {
            // store the object in the data "void*"
            _timer.data = this;
            
            // initialize the libev structure, it should expire after the connection timeout
            ev_timer_init(&_timer, callback, timeout, 0.0);

            // set a priority
            ev_set_priority(&_timer, priority);

            // start the timer (this is the time that we reserve for setting up the connection)
            ev_timer_start(_loop, &_timer);

            // the timer should not keep the event loop active
            ev_unref(_loop);
        }

        /**
         *  Watchers cannot be copied or moved
         *
         *  @param  that    The object to not move or copy
         */
        Wrapper(Wrapper &&that) = delete;
        Wrapper(const Wrapper &that) = delete;

        /**
         *  Destructor
         */
        virtual ~Wrapper()
        {
            // the timer was already stopped
            if (!timed()) return;

            // stop the timer
            ev_timer_stop(_loop, &_timer);

            // restore loop refcount
            ev_ref(_loop);
        }

        /**
         *  Start the timer (and expose the interval)
         *  @param  interval        the heartbeat interval proposed by the server
         *  @return uint16_t        the heartbeat interval that we accepted
         */
        uint16_t start(uint16_t timeout)
        {
            // we now know for sure that the connection was set up
            _timeout = timeout;
            
            // if heartbeats are disabled we do not have to set it
            if (_timeout == 0) return 0;
            
            // calculate current time
            auto now = ev_now(_loop);
            
            // we also know when the next heartbeat should be sent
            _next = now + std::max(_timeout / 2, 1);
            
            // because the server has just sent us some data, we will update the expire time too
            _expire = now + _timeout * 1.5;

            // stop the existing timer (we have to stop it and restart it, because ev_timer_set() 
            // on its own does not change the running timer) (note that we assume that the timer
            // is already running and keeps on running, so no calls to ev_ref()/en_unref() here)
            ev_timer_stop(_loop, &_timer);

            // find the earliest thing that expires
            ev_timer_set(&_timer, std::min(_next, _expire) - now, 0.0);

            // and start it again
            ev_timer_start(_loop, &_timer);
            
            // expose the accepted interval
            return _timeout;
        }
        
        /**
         *  Check if the timer is associated with a certain connection
         *  @param  connection
         *  @return bool
         */
        bool contains(const AMQP::TcpConnection *connection) const
        {
            // compare the connections
            return _connection == connection;
        }
        
        /**
         *  Monitor a filedescriptor
         *  @param  fd
         *  @param  events
         */
        void monitor(int fd, int events)
        {
            // should we remove?
            if (events == 0)
            {
                // remove the io-watcher
                _watchers.remove_if([fd](const Watcher &watcher) -> bool {
                    
                    // compare filedescriptors
                    return watcher.contains(fd);
                });
            }
            else
            {
                // look in the array for this filedescriptor
                for (auto &watcher : _watchers)
                {
                    // do we have a match?
                    if (watcher.contains(fd)) return watcher.events(events);
                }
                
                // we need a watcher
                Watchable *watchable = this;
                
                // we should monitor a new filedescriptor
                _watchers.emplace_back(_loop, watchable, fd, events, ev_priority(&_timer));
            }
        }
    };

    /**
     *  The event loop
     *  @var struct ev_loop*
     */
    struct ev_loop *_loop;
    
    /**
     *  Each connection is wrapped
     *  @var std::list
     */
    std::list<Wrapper> _wrappers;
    
    /**
     *  The priority that watchers should have (higher prio means libev gives more prio to this eveht)
     *  @var int
     */
    int _priority;

    /**
     *  Lookup a connection-wrapper, when the wrapper is not found, we construct one
     *  @param  connection
     *  @return Wrapper
     */
    Wrapper &lookup(TcpConnection *connection)
    {
        // look for the appropriate connection
        for (auto &wrapper : _wrappers)
        {
            // do we have a match?
            if (wrapper.contains(connection)) return wrapper;
        }
        
        // add to the wrappers
        _wrappers.emplace_back(_loop, connection, 60, _priority);
        
        // done
        return _wrappers.back();
    }

protected:
    /**
     *  Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
     *  @param  connection  The TCP connection object that is reporting
     *  @param  fd          The filedescriptor to be monitored
     *  @param  flags       Should the object be monitored for readability or writability?
     */
    virtual void monitor(TcpConnection *connection, int fd, int flags) override
    {
        // lookup the appropriate wrapper and start monitoring
        lookup(connection).monitor(fd, flags);
    }

    /**
     *  Method that is called when the heartbeat timeout is negotiated between the server and the client. 
     *  @param  connection      The connection that suggested a heartbeat timeout
     *  @param  timeout         The suggested timeout from the server
     *  @return uint16_t        The timeout to use
     */
    virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t timeout) override
    {
        // lookup the wrapper, and start the timer to check for activity and send heartbeats
        return lookup(connection).start(timeout);
    }

    /**
     *  Method that is called when the TCP connection is destructed
     *  @param  connection  The TCP connection
     */
    virtual void onDetached(TcpConnection *connection) override
    {
        // remove from the array
        _wrappers.remove_if([connection](const Wrapper &wrapper) -> bool {
            return wrapper.contains(connection);
        });
    }

public:
    /**
     *  Constructor
     *  @param  loop        The event loop to wrap
     *  @param  priority    The libev priority (higher priorities are invoked earlier)
     */
    LibEvHandler(struct ev_loop *loop, int priority = 0) : _loop(loop), _priority(priority) {}

    /**
     *  Destructor
     */
    virtual ~LibEvHandler() = default;
};

/**
 *  End of namespace
 */
}