1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
|
/*
* How to exploit the wonders of libevhtp's threading model to avoid using
* libevent's locking API.
*
* In this example we use Redis's Async API (Libhiredis) store and retr the following
* information for a request:
*
* Total requests seen.
* Total requests seen by the requestors IP address.
* All of the source ports seen used by the requestors IP address.
*
* We do this all using libevhtp's builtin thread-pool model, without the use of
* mutexes or evthread_use_pthreads() type stuff.
*
* The technique is simple:
* 1. Create your evhtp_t structure, assign callbacks like usual.
* 2. Call evhtp_use_threads() with a thread init callback.
* 3. Each time a thread starts, the thread init callback you defined will be
* called with information about that thread.
*
* First a bit of information about how evhtp does threading:
* libevhtp uses the evthr library, which works more like a threaded
* co-routine than a threadpool. Each evthr in a pool has its own unique
* event_base (and each evthr runs its own event_base_loop()). Under the
* hood when libevhtp sends a request to a thread, it calls
* "evthr_pool_defer(pool, _run_connection_in_thread, ...).
*
* The evthr library then finds a thread inside the pool with the lowest backlog,
* sends a packet over that threads socketpair containing information about what
* function to execute. It uses socketpairs because they can be treated as
* an event, thus able to be processed in a threads own unique
* event_base_loop().
*
* Knowing that, a connection in evhtp is never associated with the initial
* event_base that was passed to evhtp_new(), but instead the connection
* uses the evthr's unique event_base. This is what makes libevhtp's
* safe from thread-related race conditions.
*
* 4. Use the thread init callback as a place to put event type things on the
* threads event_base() instead of using the global one.
*
* In this code, that function is app_init_thread(). When this function is
* called, the first argument is the evthr_t of the thread that just
* started. This function uses "evthr_get_base(thread)" to get the
* event_base associated with this specific thread.
*
* Using that event_base, the function will start up an async redis
* connection. This redis connection is now tied to that thread, and can be
* used on a threaded request without locking (remember that your request
* has the same event_base as the thread it was executed in).
*
* We allocate a dummy structure "struct app" and then call
* "evthr_set_aux(thread, app)". This function sets some aux data which can
* be fetched at any point using evthr_get_aux(thread). We use this later on
* inside process_request()
*
* This part is the secret to evhtp threading success.
*
* 5. When a request has been fully processed, it will call the function
* "app_process_request()". Note here that the "arg" argument is NULL since no
* arguments were passed to evhtp_set_gencb().
*
* Since we want to do a bunch of redis stuff before sending a reply to the
* client, we must fetch the "struct app" data we allocated and set for the
* thread associated with this request (struct app * app =
* evthr_get_aux(thread);).
*
* struct app has our thread-specific redis connection ctx, so using that
* redisAsyncCommand() is called a bunch of times to queue up the commands
* which will be run.
*
* The last part of this technique is to call the function
* "evhtp_request_pause()". This essentially tells evhtp to flip the
* read-side of the connections file-descriptor OFF (This avoids potential
* situations where a client disconnected before all of the redis commands
* executed).
*
* 6. Each redis command is executed in order, and each callback will write to
* the requests output_buffer with relevant information from the result.
*
* 7. The last redis callback executed here is "redis_get_srcport_cb". It is
* the job os this function to call evhtp_send_reply() and then
* evhtp_request_resume().
*
* Using this design in conjunction with libevhtp makes the world an easier
* place to code.
*
* Compile: gcc thread_design.c -o thread_design -levhtp -levent -lhiredis
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <errno.h>
#include <evhtp.h>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libevent.h>
struct app_parent {
evhtp_t * evhtp;
evbase_t * evbase;
char * redis_host;
uint16_t redis_port;
};
struct app {
struct app_parent * parent;
evbase_t * evbase;
redisAsyncContext * redis;
};
static evthr_t *
get_request_thr(evhtp_request_t * request) {
evhtp_connection_t * htpconn;
evthr_t * thread;
htpconn = evhtp_request_get_connection(request);
thread = htpconn->thread;
return thread;
}
void
redis_global_incr_cb(redisAsyncContext * redis, void * redis_reply, void * arg) {
redisReply * reply = redis_reply;
evhtp_request_t * request = arg;
printf("global_incr_cb(%p)\n", request);
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) {
evbuffer_add_printf(request->buffer_out,
"redis_global_incr_cb() failed\n");
return;
}
evbuffer_add_printf(request->buffer_out,
"Total requests = %lld\n", reply->integer);
}
void
redis_srcaddr_incr_cb(redisAsyncContext * redis, void * redis_reply, void * arg) {
redisReply * reply = redis_reply;
evhtp_request_t * request = arg;
printf("incr_cb(%p)\n", request);
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) {
evbuffer_add_printf(request->buffer_out,
"redis_srcaddr_incr_cb() failed\n");
return;
}
evbuffer_add_printf(request->buffer_out,
"Requests from this source IP = %lld\n", reply->integer);
}
void
redis_set_srcport_cb(redisAsyncContext * redis, void * redis_reply, void * arg) {
redisReply * reply = redis_reply;
evhtp_request_t * request = arg;
printf("set_srcport_cb(%p)\n", request);
if (reply == NULL || reply->type != REDIS_REPLY_INTEGER) {
evbuffer_add_printf(request->buffer_out,
"redis_set_srcport_cb() failed\n");
return;
}
if (!reply->integer) {
evbuffer_add_printf(request->buffer_out,
"This source port has been seen already.\n");
} else {
evbuffer_add_printf(request->buffer_out,
"This source port has never been seen.\n");
}
}
void
redis_get_srcport_cb(redisAsyncContext * redis, void * redis_reply, void * arg) {
redisReply * reply = redis_reply;
evhtp_request_t * request = arg;
int i;
printf("get_srcport_cb(%p)\n", request);
if (reply == NULL || reply->type != REDIS_REPLY_ARRAY) {
evbuffer_add_printf(request->buffer_out,
"redis_get_srcport_cb() failed.\n");
return;
}
evbuffer_add_printf(request->buffer_out,
"source ports which have been seen for your ip:\n");
for (i = 0; i < reply->elements; i++) {
redisReply * elem = reply->element[i];
evbuffer_add_printf(request->buffer_out, "%s ", elem->str);
}
evbuffer_add(request->buffer_out, "\n", 1);
/* final callback for redis, so send the response */
evhtp_send_reply(request, EVHTP_RES_OK);
evhtp_request_resume(request);
}
void
app_process_request(evhtp_request_t * request, void * arg) {
struct sockaddr_in * sin;
struct app_parent * app_parent;
struct app * app;
evthr_t * thread;
evhtp_connection_t * conn;
char tmp[1024];
printf("process_request(%p)\n", request);
thread = get_request_thr(request);
conn = evhtp_request_get_connection(request);
app = (struct app *)evthr_get_aux(thread);
sin = (struct sockaddr_in *)conn->saddr;
evutil_inet_ntop(AF_INET, &sin->sin_addr, tmp, sizeof(tmp));
/* increment a global counter of hits on redis */
redisAsyncCommand(app->redis, redis_global_incr_cb,
(void *)request, "INCR requests:total");
/* increment a counter for hits from this source address on redis */
redisAsyncCommand(app->redis, redis_srcaddr_incr_cb,
(void *)request, "INCR requests:ip:%s", tmp);
/* add the source port of this request to a source-specific set */
redisAsyncCommand(app->redis, redis_set_srcport_cb, (void *)request,
"SADD requests:ip:%s:ports %d", tmp, ntohs(sin->sin_port));
/* get all of the ports this source address has used */
redisAsyncCommand(app->redis, redis_get_srcport_cb, (void *)request,
"SMEMBERS requests:ip:%s:ports", tmp);
/* pause the request processing */
evhtp_request_pause(request);
}
void
app_init_thread(evhtp_t * htp, evthr_t * thread, void * arg) {
struct app_parent * app_parent;
struct app * app;
app_parent = (struct app_parent *)arg;
app = calloc(sizeof(struct app), 1);
app->parent = app_parent;
app->evbase = evthr_get_base(thread);
app->redis = redisAsyncConnect(app_parent->redis_host, app_parent->redis_port);
redisLibeventAttach(app->redis, app->evbase);
evthr_set_aux(thread, app);
}
int
main(int argc, char ** argv) {
evbase_t * evbase;
evhtp_t * evhtp;
struct app_parent * app_p;
evbase = event_base_new();
evhtp = evhtp_new(evbase, NULL);
app_p = calloc(sizeof(struct app_parent), 1);
app_p->evhtp = evhtp;
app_p->evbase = evbase;
app_p->redis_host = "127.0.0.1";
app_p->redis_port = 6379;
evhtp_set_gencb(evhtp, app_process_request, NULL);
evhtp_use_threads(evhtp, app_init_thread, 4, app_p);
evhtp_bind_socket(evhtp, "127.0.0.1", 9090, 1024);
event_base_loop(evbase, 0);
return 0;
}
|