File: libuv.h

package info (click to toggle)
amqp-cpp 4.3.27-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,384 kB
  • sloc: cpp: 10,021; ansic: 191; makefile: 95
file content (224 lines) | stat: -rw-r--r-- 6,163 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
/**
 *  LibUV.h
 *
 *  Implementation for the AMQP::TcpHandler that is optimized for libuv. You can
 *  use this class instead of a AMQP::TcpHandler class, just pass the event loop
 *  to the constructor and you're all set.
 *
 *  Based heavily on the libev.h implementation by Emiel Bruijntjes <emiel.bruijntjes@copernica.com>
 *
 *  @author David Nikdel <david@nikdel.com>
 *  @copyright 2015 Copernica BV
 */

/**
 *  Include guard
 */
#pragma once

/**
 *  Dependencies
 */
#include <uv.h>

#include "amqpcpp/linux_tcp.h"

/**
 *  Set up namespace
 */
namespace AMQP {

/**
 *  Class definition
 */
class LibUvHandler : public TcpHandler
{
private:
    /**
     *  Helper class that wraps a libev I/O watcher
     */
    class Watcher
    {
    private:
        /**
         *  The event loop to which it is attached
         *  @var uv_loop_t
         */
        uv_loop_t *_loop;

        /**
         *  The actual watcher structure
         *  @var uv_poll_t
         */
        uv_poll_t *_poll;

        /**
         *  Callback method that is called by libuv when a filedescriptor becomes active
         *  @param  handle     Internal poll handle
         *  @param  status     LibUV error code UV_*
         *  @param  events     Events triggered
         */
        static void callback(uv_poll_t *handle, int status, int events)
        {
            // retrieve the connection
            TcpConnection *connection = static_cast<TcpConnection*>(handle->data);

            // tell the connection that its filedescriptor is active
            int fd = -1;
            uv_fileno(reinterpret_cast<uv_handle_t*>(handle), &fd);
            connection->process(fd, uv_to_amqp_events(status, events));
        }

    public:
        /**
         *  Constructor
         *  @param  loop            The current event loop
         *  @param  connection      The connection being watched
         *  @param  fd              The filedescriptor being watched
         *  @param  events          The events that should be monitored
         */
        Watcher(uv_loop_t *loop, TcpConnection *connection, int fd, int events) : _loop(loop)
        {
            // create a new poll
            _poll = new uv_poll_t();

            // initialize the libev structure
            uv_poll_init(_loop, _poll, fd);

            // store the connection in the data "void*"
            _poll->data = connection;

            // start the watcher
            uv_poll_start(_poll, amqp_to_uv_events(events), callback);
        }

        /**
         *  Watchers cannot be copied or moved
         *
         *  @param  that    The object to not move or copy
         */
        Watcher(Watcher &&that) = delete;
        Watcher(const Watcher &that) = delete;

        /**
         *  Destructor
         */
        virtual ~Watcher()
        {
            // stop the watcher
            uv_poll_stop(_poll);

            // close the handle
            uv_close(reinterpret_cast<uv_handle_t*>(_poll), [](uv_handle_t* handle) {
                // delete memory once closed
                delete reinterpret_cast<uv_poll_t*>(handle);
            });
        }

        /**
         *  Change the events for which the filedescriptor is monitored
         *  @param  events
         */
        void events(int events)
        {
            // update the events being watched for
            uv_poll_start(_poll, amqp_to_uv_events(events), callback);
        }

    private:

        /**
         *  Convert event flags from UV format to AMQP format
         */
        static int uv_to_amqp_events(int status, int events)
        {
            // if the socket is closed report both so we pick up the error
            if (status != 0)
                return AMQP::readable | AMQP::writable;

            // map read or write
            int amqp_events = 0;
            if (events & UV_READABLE)
                amqp_events |= AMQP::readable;
            if (events & UV_WRITABLE)
                amqp_events |= AMQP::writable;
            return amqp_events;
        }

        /**
         *  Convert event flags from AMQP format to UV format
         */
        static int amqp_to_uv_events(int events)
        {
            int uv_events = 0;
            if (events & AMQP::readable)
                uv_events |= UV_READABLE;
            if (events & AMQP::writable)
                uv_events |= UV_WRITABLE;
            return uv_events;
        }
    };


    /**
     *  The event loop
     *  @var uv_loop_t*
     */
    uv_loop_t *_loop;

    /**
     *  All I/O watchers that are active, indexed by their filedescriptor
     *  @var std::map<int,Watcher>
     */
    std::map<int,std::unique_ptr<Watcher>> _watchers;


    /**
     *  Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
     *  @param  connection  The TCP connection object that is reporting
     *  @param  fd          The filedescriptor to be monitored
     *  @param  flags       Should the object be monitored for readability or writability?
     */
    virtual void monitor(TcpConnection *connection, int fd, int flags) override
    {
        // do we already have this filedescriptor
        auto iter = _watchers.find(fd);

        // was it found?
        if (iter == _watchers.end())
        {
            // we did not yet have this watcher - but that is ok if no filedescriptor was registered
            if (flags == 0) return;

            // construct a new watcher, and put it in the map
            _watchers[fd] = std::unique_ptr<Watcher>(new Watcher(_loop, connection, fd, flags));
        }
        else if (flags == 0)
        {
            // the watcher does already exist, but we no longer have to watch this watcher
            _watchers.erase(iter);
        }
        else
        {
            // change the events
            iter->second->events(flags);
        }
    }

public:
    /**
     *  Constructor
     *  @param  loop    The event loop to wrap
     */
    LibUvHandler(uv_loop_t *loop) : _loop(loop) {}

    /**
     *  Destructor
     */
    virtual ~LibUvHandler() = default;
};

/**
 *  End of namespace
 */
}