File: libevent.h

package info (click to toggle)
amqp-cpp 4.3.27-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,384 kB
  • sloc: cpp: 10,021; ansic: 191; makefile: 95
file content (207 lines) | stat: -rw-r--r-- 6,099 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/**
 *  LibEvent.h
 *
 *  Implementation for the AMQP::TcpHandler that is optimized for libevent. You can
 *  use this class instead of a AMQP::TcpHandler class, just pass the event loop
 *  to the constructor and you're all set
 *
 *  Compile with: "g++ -std=c++11 libevent.cpp -lamqpcpp -levent -lpthread"
 *
 *  @author Brent Dimmig <brentdimmig@gmail.com>
 */

/**
 *  Include guard
 */
#pragma once

/**
 *  Dependencies
 */
#include <event2/event.h>
#include <amqpcpp/flags.h>
#include <amqpcpp/linux_tcp.h>

/**
 *  Set up namespace
 */
namespace AMQP {

/**
 *  Class definition
 */
class LibEventHandler : public TcpHandler
{
private:
    /**
     *  Helper class that wraps a libev I/O watcher
     */
    class Watcher
    {
    private:
        /**
         *  The actual event structure
         *  @var struct event
         */
        struct event * _event;

        /**
         *  Callback method that is called by libevent when a filedescriptor becomes active
         *  @param  fd                   The filedescriptor with an event
         *  @param  what                 Events triggered
         *  @param  connection_arg       void * to the connection
         */
        static void callback(evutil_socket_t fd, short what, void *connection_arg)
        {
            // retrieve the connection
            TcpConnection *connection = static_cast<TcpConnection*>(connection_arg);

            // setup amqp flags
            int amqp_flags = 0;
            if (what & EV_READ)
                amqp_flags |= AMQP::readable;
            if (what & EV_WRITE)
                amqp_flags |= AMQP::writable;

            // tell the connection that its filedescriptor is active
            connection->process(fd, amqp_flags);
        }

    public:
        /**
         *  Constructor
         *  @param  evbase          The current event loop
         *  @param  connection      The connection being watched
         *  @param  fd              The filedescriptor being watched
         *  @param  events          The events that should be monitored
         */
        Watcher(struct event_base *evbase, TcpConnection *connection, int fd, int events)
        {
            // setup libevent flags
            short event_flags = EV_PERSIST;
            if (events & AMQP::readable)
                event_flags |= EV_READ;
            if (events & AMQP::writable)
                event_flags |= EV_WRITE;

            // initialize the event

            _event = event_new(evbase, fd, event_flags, callback, connection);
            event_add(_event, nullptr);
        }

        /**
         *  Destructor
         */
        virtual ~Watcher()
        {
            // stop the event
            event_del(_event);
            // free the event
            event_free(_event);
        }

        /**
         *  Change the events for which the filedescriptor is monitored
         *  @param  events
         */
        void events(int events)
        {
            // stop the event if it was active
            event_del(_event);

            // setup libevent flags
            short event_flags = EV_PERSIST;
            if (events & AMQP::readable)
                event_flags |= EV_READ;
            if (events & AMQP::writable)
                event_flags |= EV_WRITE;

            // set the events
            event_assign(_event, event_get_base(_event), event_get_fd(_event), event_flags,
                         event_get_callback(_event), event_get_callback_arg(_event));

            // and restart it
            event_add(_event, nullptr);
        }
    };


    /**
     *  The event loop
     *  @var struct event_base*
     */
    struct event_base *_evbase;

    /**
     *  All I/O watchers that are active, indexed by their filedescriptor
     *  @var std::map<int,Watcher>
     */
    std::map<int,std::unique_ptr<Watcher>> _watchers;


    /**
     *  Method that is called when the heartbeat frequency is negotiated
     *  @param  connection      The connection that suggested a heartbeat interval
     *  @param  interval        The suggested interval from the server
     *  @return uint16_t        The interval to use
     */
    virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override
    {
        // call base (in the highly theoretical case that the base class does something meaningful)
        auto response = TcpHandler::onNegotiate(connection, interval);
        
        // because the LibEvHandler has not yet implemented timers for ensuring that we send
        // some data every couple of seconds, we disabled timeouts
        return 0;
    }

    /**
     *  Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
     *  @param  connection  The TCP connection object that is reporting
     *  @param  fd          The filedescriptor to be monitored
     *  @param  flags       Should the object be monitored for readability or writability?
     */
    virtual void monitor(TcpConnection *connection, int fd, int flags) override
    {
        // do we already have this filedescriptor
        auto iter = _watchers.find(fd);

        // was it found?
        if (iter == _watchers.end())
        {
            // we did not yet have this watcher - but that is ok if no filedescriptor was registered
            if (flags == 0) return;

            // construct a new watcher, and put it in the map
            _watchers[fd] = std::unique_ptr<Watcher>(new Watcher(_evbase, connection, fd, flags));
        }
        else if (flags == 0)
        {
            // the watcher does already exist, but we no longer have to watch this watcher
            _watchers.erase(iter);
        }
        else
        {
            // change the events
            iter->second->events(flags);
        }
    }

public:
    /**
     *  Constructor
     *  @param  evbase  The event loop to wrap
     */
    LibEventHandler(struct event_base *evbase) : _evbase(evbase) {}

    /**
     *  Destructor
     */
    virtual ~LibEventHandler() = default;
};

/**
 *  End of namespace
 */
}