1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
|
/*
uWSGI rawrouter
*/
#include "../../uwsgi.h"
#include "../corerouter/cr.h"
static struct uwsgi_rawrouter {
struct uwsgi_corerouter cr;
int xclient;
} urr;
extern struct uwsgi_server uwsgi;
struct rawrouter_session {
struct corerouter_session session;
// XCLIENT ADDR=xxx\r\n
struct uwsgi_buffer *xclient;
size_t xclient_pos;
// placeholder for \r\n
size_t xclient_rn;
};
static struct uwsgi_option rawrouter_options[] = {
{"rawrouter", required_argument, 0, "run the rawrouter on the specified port", uwsgi_opt_undeferred_corerouter, &urr, 0},
{"rawrouter-processes", required_argument, 0, "prefork the specified number of rawrouter processes", uwsgi_opt_set_int, &urr.cr.processes, 0},
{"rawrouter-workers", required_argument, 0, "prefork the specified number of rawrouter processes", uwsgi_opt_set_int, &urr.cr.processes, 0},
{"rawrouter-zerg", required_argument, 0, "attach the rawrouter to a zerg server", uwsgi_opt_corerouter_zerg, &urr, 0},
{"rawrouter-use-cache", optional_argument, 0, "use uWSGI cache as hostname->server mapper for the rawrouter", uwsgi_opt_set_str, &urr.cr.use_cache, 0},
{"rawrouter-use-pattern", required_argument, 0, "use a pattern for rawrouter hostname->server mapping", uwsgi_opt_corerouter_use_pattern, &urr, 0},
{"rawrouter-use-base", required_argument, 0, "use a base dir for rawrouter hostname->server mapping", uwsgi_opt_corerouter_use_base, &urr, 0},
{"rawrouter-fallback", required_argument, 0, "fallback to the specified node in case of error", uwsgi_opt_add_string_list, &urr.cr.fallback, 0},
{"rawrouter-use-code-string", required_argument, 0, "use code string as hostname->server mapper for the rawrouter", uwsgi_opt_corerouter_cs, &urr, 0},
{"rawrouter-use-socket", optional_argument, 0, "forward request to the specified uwsgi socket", uwsgi_opt_corerouter_use_socket, &urr, 0},
{"rawrouter-to", required_argument, 0, "forward requests to the specified uwsgi server (you can specify it multiple times for load balancing)", uwsgi_opt_add_string_list, &urr.cr.static_nodes, 0},
{"rawrouter-gracetime", required_argument, 0, "retry connections to dead static nodes after the specified amount of seconds", uwsgi_opt_set_int, &urr.cr.static_node_gracetime, 0},
{"rawrouter-events", required_argument, 0, "set the maximum number of concurrent events", uwsgi_opt_set_int, &urr.cr.nevents, 0},
{"rawrouter-max-retries", required_argument, 0, "set the maximum number of retries/fallbacks to other nodes", uwsgi_opt_set_int, &urr.cr.max_retries, 0},
{"rawrouter-quiet", required_argument, 0, "do not report failed connections to instances", uwsgi_opt_true, &urr.cr.quiet, 0},
{"rawrouter-cheap", no_argument, 0, "run the rawrouter in cheap mode", uwsgi_opt_true, &urr.cr.cheap, 0},
{"rawrouter-subscription-server", required_argument, 0, "run the rawrouter subscription server on the spcified address", uwsgi_opt_corerouter_ss, &urr, 0},
{"rawrouter-subscription-slot", required_argument, 0, "*** deprecated ***", uwsgi_opt_deprecated, (void *) "useless thanks to the new implementation", 0},
{"rawrouter-timeout", required_argument, 0, "set rawrouter timeout", uwsgi_opt_set_int, &urr.cr.socket_timeout, 0},
{"rawrouter-stats", required_argument, 0, "run the rawrouter stats server", uwsgi_opt_set_str, &urr.cr.stats_server, 0},
{"rawrouter-stats-server", required_argument, 0, "run the rawrouter stats server", uwsgi_opt_set_str, &urr.cr.stats_server, 0},
{"rawrouter-ss", required_argument, 0, "run the rawrouter stats server", uwsgi_opt_set_str, &urr.cr.stats_server, 0},
{"rawrouter-harakiri", required_argument, 0, "enable rawrouter harakiri", uwsgi_opt_set_int, &urr.cr.harakiri, 0},
{"rawrouter-xclient", no_argument, 0, "use the xclient protocol to pass the client addres", uwsgi_opt_true, &urr.xclient, 0},
{"rawrouter-buffer-size", required_argument, 0, "set internal buffer size (default: page size)", uwsgi_opt_set_64bit, &urr.cr.buffer_size, 0},
{0, 0, 0, 0, 0, 0, 0},
};
// write to backend
static ssize_t rr_instance_write(struct corerouter_peer *peer) {
ssize_t len = cr_write(peer, "rr_instance_write()");
// end on empty write
if (!len) return 0;
// the chunk has been sent, start (again) reading from client and instances
if (cr_write_complete(peer)) {
// reset the buffer
peer->out->pos = 0;
cr_reset_hooks(peer);
}
return len;
}
// write to client
static ssize_t rr_write(struct corerouter_peer *main_peer) {
ssize_t len = cr_write(main_peer, "rr_write()");
// end on empty write
if (!len) return 0;
// ok this response chunk is sent, let's start reading again
if (cr_write_complete(main_peer)) {
// reset the buffer
main_peer->out->pos = 0;
cr_reset_hooks(main_peer);
}
return len;
}
// read from backend
static ssize_t rr_instance_read(struct corerouter_peer *peer) {
ssize_t len = cr_read(peer, "rr_instance_read()");
if (!len) return 0;
// set the input buffer as the main output one
peer->session->main_peer->out = peer->in;
peer->session->main_peer->out_pos = 0;
cr_write_to_main(peer, rr_write);
return len;
}
// write the xclient banner
static ssize_t rr_xclient_write(struct corerouter_peer *peer) {
struct corerouter_session *cs = peer->session;
struct rawrouter_session *rr = (struct rawrouter_session *) cs;
ssize_t len = cr_write_buf(peer, rr->xclient, "rr_xclient_write()");
if (!len) return 0;
if (cr_write_complete_buf(peer, rr->xclient)) {
if (peer->session->main_peer->out_pos > 0) {
// (eventually) send previous data
peer->last_hook_read = rr_instance_read;
cr_write_to_main(peer, rr_write);
}
else {
// reset to standard behaviour
peer->in->pos = 0;
cr_reset_hooks_and_read(peer, rr_instance_read);
}
}
return len;
}
// read the first line from the backend and skip it
static ssize_t rr_xclient_read(struct corerouter_peer *peer) {
struct corerouter_session *cs = peer->session;
struct rawrouter_session *rr = (struct rawrouter_session *) cs;
ssize_t len = cr_read(peer, "rr_xclient_read()");
if (!len) return 0;
char *ptr = (peer->in->buf + peer->in->pos) - len;
ssize_t i;
for(i=0;i<len;i++) {
if (rr->xclient_rn == 1) {
if (ptr[i] != '\n') {
return -1;
}
// banner received (skip it, will be sent later)
size_t remains = len - (i+1);
if (remains > 0) {
peer->session->main_peer->out = peer->in;
peer->session->main_peer->out_pos = (peer->in->pos - remains) ;
}
cr_write_to_backend(peer, rr_xclient_write);
return len;
}
else if (ptr[i] == '\r') {
rr->xclient_rn = 1;
}
}
return len;
}
// the instance is connected now we cannot retry connections
static ssize_t rr_instance_connected(struct corerouter_peer *peer) {
struct corerouter_session *cs = peer->session;
struct rawrouter_session *rr = (struct rawrouter_session *) cs;
cr_peer_connected(peer, "rr_instance_connected()");
peer->can_retry = 0;
if (rr->xclient) {
cr_reset_hooks_and_read(peer, rr_xclient_read);
return 1;
}
cr_reset_hooks_and_read(peer, rr_instance_read);
return 1;
}
// read from client
static ssize_t rr_read(struct corerouter_peer *main_peer) {
ssize_t len = cr_read(main_peer, "rr_read()");
if (!len) return 0;
main_peer->session->peers->out = main_peer->in;
main_peer->session->peers->out_pos = 0;
cr_write_to_backend(main_peer->session->peers, rr_instance_write);
return len;
}
// retry the connection
static int rr_retry(struct corerouter_peer *peer) {
struct corerouter_session *cs = peer->session;
struct uwsgi_corerouter *ucr = cs->corerouter;
if (peer->instance_address_len > 0) goto retry;
if (ucr->mapper(ucr, peer)) {
return -1;
}
if (peer->instance_address_len == 0) {
return -1;
}
retry:
// start async connect (again)
cr_connect(peer, rr_instance_connected);
return 0;
}
static void rr_session_close(struct corerouter_session *cs) {
struct rawrouter_session *rr = (struct rawrouter_session *) cs;
if (rr->xclient) {
uwsgi_buffer_destroy(rr->xclient);
}
}
// allocate a new session
static int rawrouter_alloc_session(struct uwsgi_corerouter *ucr, struct uwsgi_gateway_socket *ugs, struct corerouter_session *cs, struct sockaddr *sa, socklen_t s_len) {
// set default read hook
cs->main_peer->last_hook_read = rr_read;
// set close hook
cs->close = rr_session_close;
// set retry hook
cs->retry = rr_retry;
if (sa && sa->sa_family == AF_INET) {
if (urr.xclient) {
struct rawrouter_session *rr = (struct rawrouter_session *) cs;
rr->xclient = uwsgi_buffer_new(13+sizeof(cs->client_address)+2);
if (uwsgi_buffer_append(rr->xclient, "XCLIENT ADDR=", 13)) return -1;
if (uwsgi_buffer_append(rr->xclient, cs->client_address, strlen(cs->client_address))) return -1;
if (uwsgi_buffer_append(rr->xclient, "\r\n", 2)) return -1;
}
}
// add a new peer
struct corerouter_peer *peer = uwsgi_cr_peer_add(cs);
// set default peer hook
peer->last_hook_read = rr_instance_read;
// use the address as hostname
memcpy(peer->key, cs->ugs->name, cs->ugs->name_len);
peer->key_len = cs->ugs->name_len;
// the mapper hook
if (ucr->mapper(ucr, peer)) {
return -1;
}
if (peer->instance_address_len == 0) {
return -1;
}
peer->can_retry = 1;
cr_connect(peer, rr_instance_connected);
return 0;
}
static int rawrouter_init() {
urr.cr.session_size = sizeof(struct rawrouter_session);
urr.cr.alloc_session = rawrouter_alloc_session;
uwsgi_corerouter_init((struct uwsgi_corerouter *) &urr);
return 0;
}
static void rawrouter_setup() {
urr.cr.name = uwsgi_str("uWSGI rawrouter");
urr.cr.short_name = uwsgi_str("rawrouter");
}
struct uwsgi_plugin rawrouter_plugin = {
.name = "rawrouter",
.options = rawrouter_options,
.init = rawrouter_init,
.on_load = rawrouter_setup
};
|