File: rawrouter.c

package info (click to toggle)
uwsgi 2.0.29-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,684 kB
  • sloc: ansic: 87,027; python: 7,001; cpp: 1,131; java: 708; perl: 678; sh: 585; ruby: 555; makefile: 148; xml: 130; cs: 121; objc: 37; php: 28; erlang: 20; javascript: 11
file content (285 lines) | stat: -rw-r--r-- 10,024 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
/*

   uWSGI rawrouter

*/

#include "../../uwsgi.h"
#include "../corerouter/cr.h"

static struct uwsgi_rawrouter {
	struct uwsgi_corerouter cr;
	int xclient;
} urr;

extern struct uwsgi_server uwsgi;

struct rawrouter_session {
	struct corerouter_session session;

	// XCLIENT ADDR=xxx\r\n
	struct uwsgi_buffer *xclient;
	size_t xclient_pos;
	// placeholder for \r\n
	size_t xclient_rn;
};

static struct uwsgi_option rawrouter_options[] = {
	{"rawrouter", required_argument, 0, "run the rawrouter on the specified port", uwsgi_opt_undeferred_corerouter, &urr, 0},
	{"rawrouter-processes", required_argument, 0, "prefork the specified number of rawrouter processes", uwsgi_opt_set_int, &urr.cr.processes, 0},
	{"rawrouter-workers", required_argument, 0, "prefork the specified number of rawrouter processes", uwsgi_opt_set_int, &urr.cr.processes, 0},
	{"rawrouter-zerg", required_argument, 0, "attach the rawrouter to a zerg server", uwsgi_opt_corerouter_zerg, &urr, 0},
	{"rawrouter-use-cache", optional_argument, 0, "use uWSGI cache as hostname->server mapper for the rawrouter", uwsgi_opt_set_str, &urr.cr.use_cache, 0},

	{"rawrouter-use-pattern", required_argument, 0, "use a pattern for rawrouter hostname->server mapping", uwsgi_opt_corerouter_use_pattern, &urr, 0},
	{"rawrouter-use-base", required_argument, 0, "use a base dir for rawrouter hostname->server mapping", uwsgi_opt_corerouter_use_base, &urr, 0},

	{"rawrouter-fallback", required_argument, 0, "fallback to the specified node in case of error", uwsgi_opt_add_string_list, &urr.cr.fallback, 0},

	{"rawrouter-use-code-string", required_argument, 0, "use code string as hostname->server mapper for the rawrouter", uwsgi_opt_corerouter_cs, &urr, 0},
	{"rawrouter-use-socket", optional_argument, 0, "forward request to the specified uwsgi socket", uwsgi_opt_corerouter_use_socket, &urr, 0},
	{"rawrouter-to", required_argument, 0, "forward requests to the specified uwsgi server (you can specify it multiple times for load balancing)", uwsgi_opt_add_string_list, &urr.cr.static_nodes, 0},
	{"rawrouter-gracetime", required_argument, 0, "retry connections to dead static nodes after the specified amount of seconds", uwsgi_opt_set_int, &urr.cr.static_node_gracetime, 0},
	{"rawrouter-events", required_argument, 0, "set the maximum number of concurrent events", uwsgi_opt_set_int, &urr.cr.nevents, 0},
	{"rawrouter-max-retries", required_argument, 0, "set the maximum number of retries/fallbacks to other nodes", uwsgi_opt_set_int, &urr.cr.max_retries, 0},
	{"rawrouter-quiet", required_argument, 0, "do not report failed connections to instances", uwsgi_opt_true, &urr.cr.quiet, 0},
	{"rawrouter-cheap", no_argument, 0, "run the rawrouter in cheap mode", uwsgi_opt_true, &urr.cr.cheap, 0},
	{"rawrouter-subscription-server", required_argument, 0, "run the rawrouter subscription server on the spcified address", uwsgi_opt_corerouter_ss, &urr, 0},
	{"rawrouter-subscription-slot", required_argument, 0, "*** deprecated ***", uwsgi_opt_deprecated, (void *) "useless thanks to the new implementation", 0},

	{"rawrouter-timeout", required_argument, 0, "set rawrouter timeout", uwsgi_opt_set_int, &urr.cr.socket_timeout, 0},

	{"rawrouter-stats", required_argument, 0, "run the rawrouter stats server", uwsgi_opt_set_str, &urr.cr.stats_server, 0},
	{"rawrouter-stats-server", required_argument, 0, "run the rawrouter stats server", uwsgi_opt_set_str, &urr.cr.stats_server, 0},
	{"rawrouter-ss", required_argument, 0, "run the rawrouter stats server", uwsgi_opt_set_str, &urr.cr.stats_server, 0},
	{"rawrouter-harakiri", required_argument, 0, "enable rawrouter harakiri", uwsgi_opt_set_int, &urr.cr.harakiri, 0},

	{"rawrouter-xclient", no_argument, 0, "use the xclient protocol to pass the client addres", uwsgi_opt_true, &urr.xclient, 0},

	{"rawrouter-buffer-size", required_argument, 0, "set internal buffer size (default: page size)", uwsgi_opt_set_64bit, &urr.cr.buffer_size, 0},

	{0, 0, 0, 0, 0, 0, 0},
};

// write to backend
static ssize_t rr_instance_write(struct corerouter_peer *peer) {
	ssize_t len = cr_write(peer, "rr_instance_write()");
	// end on empty write
	if (!len) return 0;

	// the chunk has been sent, start (again) reading from client and instances
	if (cr_write_complete(peer)) {
		// reset the buffer
		peer->out->pos = 0;
		cr_reset_hooks(peer);
	}

	return len;
}

// write to client
static ssize_t rr_write(struct corerouter_peer *main_peer) {
	ssize_t len = cr_write(main_peer, "rr_write()");
	// end on empty write
	if (!len) return 0;

	// ok this response chunk is sent, let's start reading again
	if (cr_write_complete(main_peer)) {
		// reset the buffer
		main_peer->out->pos = 0;
                cr_reset_hooks(main_peer);
        } 

	return len;
}

// read from backend
static ssize_t rr_instance_read(struct corerouter_peer *peer) {
	ssize_t len = cr_read(peer, "rr_instance_read()");
	if (!len) return 0;

	// set the input buffer as the main output one
	peer->session->main_peer->out = peer->in;
	peer->session->main_peer->out_pos = 0;

	cr_write_to_main(peer, rr_write);
	return len;
}

// write the xclient banner
static ssize_t rr_xclient_write(struct corerouter_peer *peer) {
        struct corerouter_session *cs = peer->session;
        struct rawrouter_session *rr = (struct rawrouter_session *) cs;
        ssize_t len = cr_write_buf(peer, rr->xclient, "rr_xclient_write()");
        if (!len) return 0;

        if (cr_write_complete_buf(peer, rr->xclient)) {
                if (peer->session->main_peer->out_pos > 0) {
                        // (eventually) send previous data
			peer->last_hook_read = rr_instance_read;
                        cr_write_to_main(peer, rr_write);
                }
                else {
                        // reset to standard behaviour
			peer->in->pos = 0;
			cr_reset_hooks_and_read(peer, rr_instance_read);
                }
        }

        return len;
}

// read the first line from the backend and skip it
static ssize_t rr_xclient_read(struct corerouter_peer *peer) {
	struct corerouter_session *cs = peer->session;
        struct rawrouter_session *rr = (struct rawrouter_session *) cs;
        ssize_t len = cr_read(peer, "rr_xclient_read()");
	if (!len) return 0;

	char *ptr = (peer->in->buf + peer->in->pos) - len;
	ssize_t i;
	for(i=0;i<len;i++) {
		if (rr->xclient_rn == 1) {
			if (ptr[i] != '\n') {
				return -1;
			}
			// banner received (skip it, will be sent later)
			size_t remains = len - (i+1);
			if (remains > 0) {
				peer->session->main_peer->out = peer->in;
				peer->session->main_peer->out_pos = (peer->in->pos - remains) ;
			}
			cr_write_to_backend(peer, rr_xclient_write);
			return len;
		}
		else if (ptr[i] == '\r') {
			rr->xclient_rn = 1;
		}
	}

	return len;
}

// the instance is connected now we cannot retry connections
static ssize_t rr_instance_connected(struct corerouter_peer *peer) {

	struct corerouter_session *cs = peer->session;
        struct rawrouter_session *rr = (struct rawrouter_session *) cs;
	cr_peer_connected(peer, "rr_instance_connected()");

	peer->can_retry = 0;

	if (rr->xclient) {
		cr_reset_hooks_and_read(peer, rr_xclient_read);
		return 1;
	}
	cr_reset_hooks_and_read(peer, rr_instance_read);
	return 1;
}

// read from client
static ssize_t rr_read(struct corerouter_peer *main_peer) {
	ssize_t len = cr_read(main_peer, "rr_read()");
	if (!len) return 0;

	main_peer->session->peers->out = main_peer->in;
	main_peer->session->peers->out_pos = 0;

	cr_write_to_backend(main_peer->session->peers, rr_instance_write);
	return len;
}

// retry the connection
static int rr_retry(struct corerouter_peer *peer) {

	struct corerouter_session *cs = peer->session;
	struct uwsgi_corerouter *ucr = cs->corerouter;

	if (peer->instance_address_len > 0) goto retry;

	if (ucr->mapper(ucr, peer)) {
                return -1;
        }

        if (peer->instance_address_len == 0) {
                return -1;
	}

retry:
	// start async connect (again)
	cr_connect(peer, rr_instance_connected);
	return 0;
}

static void rr_session_close(struct corerouter_session *cs) {
	struct rawrouter_session *rr = (struct rawrouter_session *) cs;
	if (rr->xclient) {
		uwsgi_buffer_destroy(rr->xclient);
	}
}

// allocate a new session
static int rawrouter_alloc_session(struct uwsgi_corerouter *ucr, struct uwsgi_gateway_socket *ugs, struct corerouter_session *cs, struct sockaddr *sa, socklen_t s_len) {

	// set default read hook
	cs->main_peer->last_hook_read = rr_read;
	// set close hook
	cs->close = rr_session_close;
	// set retry hook
	cs->retry = rr_retry;

	if (sa && sa->sa_family == AF_INET) {
		if (urr.xclient) {
			struct rawrouter_session *rr = (struct rawrouter_session *) cs;
			rr->xclient = uwsgi_buffer_new(13+sizeof(cs->client_address)+2);
			if (uwsgi_buffer_append(rr->xclient, "XCLIENT ADDR=", 13)) return -1;
			if (uwsgi_buffer_append(rr->xclient, cs->client_address, strlen(cs->client_address))) return -1;
			if (uwsgi_buffer_append(rr->xclient, "\r\n", 2)) return -1;
		}
        }

	// add a new peer
	struct corerouter_peer *peer = uwsgi_cr_peer_add(cs);

	// set default peer hook
	peer->last_hook_read = rr_instance_read;

	// use the address as hostname
        memcpy(peer->key, cs->ugs->name, cs->ugs->name_len);
        peer->key_len = cs->ugs->name_len;

        // the mapper hook
        if (ucr->mapper(ucr, peer)) {
		return -1;
	}

        if (peer->instance_address_len == 0) {
		return -1;
        }

	peer->can_retry = 1;
	cr_connect(peer, rr_instance_connected);
	return 0;
}

static int rawrouter_init() {

	urr.cr.session_size = sizeof(struct rawrouter_session);
	urr.cr.alloc_session = rawrouter_alloc_session;
	uwsgi_corerouter_init((struct uwsgi_corerouter *) &urr);

	return 0;
}

static void rawrouter_setup() {
	urr.cr.name = uwsgi_str("uWSGI rawrouter");
	urr.cr.short_name = uwsgi_str("rawrouter");
}

struct uwsgi_plugin rawrouter_plugin = {

	.name = "rawrouter",
	.options = rawrouter_options,
	.init = rawrouter_init,
	.on_load = rawrouter_setup
};