File: cr.h

package info (click to toggle)
uwsgi 2.0.29-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,684 kB
  • sloc: ansic: 87,027; python: 7,001; cpp: 1,131; java: 708; perl: 678; sh: 585; ruby: 555; makefile: 148; xml: 130; cs: 121; objc: 37; php: 28; erlang: 20; javascript: 11
file content (368 lines) | stat: -rw-r--r-- 12,799 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
#define COREROUTER_STATUS_FREE 0
#define COREROUTER_STATUS_CONNECTING 1
#define COREROUTER_STATUS_RECV_HDR 2
#define COREROUTER_STATUS_RESPONSE 3

#define cr_add_timeout(u, x) uwsgi_add_rb_timer(u->timeouts, uwsgi_now()+x->current_timeout, x)
#define cr_add_timeout_fast(u, x, t) uwsgi_add_rb_timer(u->timeouts, t+x->current_timeout, x)
#define cr_del_timeout(u, x) uwsgi_del_rb_timer(u->timeouts, x->timeout); free(x->timeout);

#define uwsgi_cr_error(x, y) uwsgi_log("[uwsgi-%s key: %.*s client_addr: %s client_port: %s] %s: %s [%s line %d]\n", x->session->corerouter->short_name, (x == x->session->main_peer) ? (x->session->peers ? x->session->peers->key_len: 0) : x->key_len, (x == x->session->main_peer) ? (x->session->peers ? x->session->peers->key: "") : x->key, x->session->client_address, x->session->client_port, y, strerror(errno), __FILE__, __LINE__)
#define uwsgi_cr_log(x, y, ...) uwsgi_log("[uwsgi-%s key: %.*s client_addr: %s client_port: %s]" y, x->session->corerouter->short_name,  (x == x->session->main_peer) ? (x->session->peers ? x->session->peers->key_len: 0) : x->key_len, (x == x->session->main_peer) ? (x->session->peers ? x->session->peers->key: "") : x->key, x->session->client_address, x->session->client_port, __VA_ARGS__)

#define cr_try_again if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS) {\
                     	errno = EINPROGRESS;\
                     	return -1;\
                     }

#define cr_write(peer, f) write(peer->fd, peer->out->buf + peer->out_pos, peer->out->pos - peer->out_pos);\
	if (len < 0) {\
                cr_try_again;\
                uwsgi_cr_error(peer, f);\
                return -1;\
        }\
	if (peer != peer->session->main_peer && peer->un) peer->un->rx+=len;\
        peer->out_pos += len;

#define cr_write_buf(peer, ubuf, f) write(peer->fd, ubuf->buf + ubuf##_pos, ubuf->pos - ubuf##_pos);\
        if (len < 0) {\
                cr_try_again;\
                uwsgi_cr_error(peer, f);\
                return -1;\
        }\
	if (peer != peer->session->main_peer && peer->un) peer->un->rx+=len;\
        ubuf##_pos += len;

#define cr_write_complete(peer) peer->out_pos == peer->out->pos

#define cr_write_complete_buf(peer, buf) buf##_pos == buf->pos

#define cr_connect(peer, f) peer->fd = uwsgi_connectn(peer->instance_address, peer->instance_address_len, 0, 1);\
        if (peer->fd < 0) {\
                peer->failed = 1;\
                peer->soopt = errno;\
                return -1;\
        }\
        peer->session->corerouter->cr_table[peer->fd] = peer;\
        peer->connecting = 1;\
	cr_write_to_backend(peer, f);

#define cr_read(peer, f) read(peer->fd, peer->in->buf + peer->in->pos, peer->in->len - peer->in->pos);\
	if (len < 0) {\
                cr_try_again;\
                uwsgi_cr_error(peer, f);\
                return -1;\
        }\
	if (peer != peer->session->main_peer && peer->un) peer->un->tx+=len;\
        peer->in->pos += len;\

#define cr_read_exact(peer, l, f) read(peer->fd, peer->in->buf + peer->in->pos, (l - peer->in->pos));\
        if (len < 0) {\
                cr_try_again;\
                uwsgi_cr_error(peer, f);\
                return -1;\
        }\
	if (peer != peer->session->main_peer && peer->un) peer->un->tx+=len;\
        peer->in->pos += len;\

#define cr_reset_hooks(peer) if(!peer->session->main_peer->disabled) {\
			if (uwsgi_cr_set_hooks(peer->session->main_peer, peer->session->main_peer->last_hook_read, NULL)) return -1;\
		}\
		else {\
			if (uwsgi_cr_set_hooks(peer->session->main_peer, NULL, NULL)) return -1;\
		}\
		struct corerouter_peer *peers = peer->session->peers;\
                while(peers) {\
                        if (uwsgi_cr_set_hooks(peers, peers->last_hook_read, NULL)) return -1;\
                        peers = peers->next;\
                }

#define cr_reset_hooks_and_read(peer, f) if (uwsgi_cr_set_hooks(peer->session->main_peer, peer->session->main_peer->last_hook_read, NULL)) return -1;\
		peer->last_hook_read = f;\
                struct corerouter_peer *peers = peer->session->peers;\
                while(peers) {\
                        if (uwsgi_cr_set_hooks(peers, peers->last_hook_read, NULL)) return -1;\
                        peers = peers->next;\
                }

#define cr_write_to_main(peer, f) if (uwsgi_cr_set_hooks(peer->session->main_peer, NULL, f)) return -1;\
		struct corerouter_peer *peers = peer->session->peers;\
                while(peers) {\
                        if (uwsgi_cr_set_hooks(peers, NULL, NULL)) return -1;\
                        peers = peers->next;\
                }

#define cr_write_to_backend(peer, f) if (uwsgi_cr_set_hooks(peer->session->main_peer, NULL, NULL)) return -1;\
		if (uwsgi_cr_set_hooks(peer, NULL, f)) return -1;\
                struct corerouter_peer *peers = peer->session->peers;\
                while(peers) {\
			if (peers != peer) {\
                        	if (uwsgi_cr_set_hooks(peers, NULL, NULL)) return -1;\
			}\
                        peers = peers->next;\
                }

#define cr_peer_connected(peer, f) socklen_t solen = sizeof(int);\
        if (getsockopt(peer->fd, SOL_SOCKET, SO_ERROR, (void *) (&peer->soopt), &solen) < 0) {\
                uwsgi_cr_error(peer, f "/getsockopt()");\
                peer->failed = 1;\
                return -1;\
        }\
        if (peer->soopt) {\
                peer->failed = 1;\
                return -1;\
        }\
	peer->connecting = 0;\
	peer->can_retry = 0;\
        if (peer->static_node) peer->static_node->custom2++;\
        if (peer->un) {\
		peer->un->requests++;\
		peer->un->last_requests++;\
	}\


struct corerouter_session;

// a peer is a connection to a socket (a client or a backend) and can be monitored for events.
struct corerouter_peer {
	// the file descriptor 
	int fd;
	// the session
	struct corerouter_session *session;

	// if set do not wait for events
	int disabled;

	// hook to run on a read event
	ssize_t (*hook_read)(struct corerouter_peer *);
	ssize_t (*last_hook_read)(struct corerouter_peer *);
	// hook to run on a write event
	ssize_t (*hook_write)(struct corerouter_peer *);
	ssize_t (*last_hook_write)(struct corerouter_peer *);

	// has the peer failed ?
	int failed;
	// is the peer connecting ?
	int connecting;
	// is there a connection error ?
        int soopt;
	// has the peer timed out ?
        int timed_out;
	// the timeout rb_tree
        struct uwsgi_rb_timer *timeout;

	// each peer can map to a different instance
        char *tmp_socket_name;
        char *instance_address;
        uint64_t instance_address_len;

	// backend info
        struct uwsgi_subscribe_node *un;
        struct uwsgi_string_list *static_node;

	// incoming data 
        struct uwsgi_buffer *in;
	// data to send
        struct uwsgi_buffer *out;
	// amount of sent data (partial write management)
	size_t out_pos;
	int out_need_free;

	// stream id (could have various use)
	uint32_t sid;

	// internal parser status
	int r_parser_status;

	// can retry ?
	int can_retry;
	// how many retries ?
	uint16_t retries;

	// parsed key
        char key[0xff];
        uint8_t key_len;

	uint8_t modifier1;
	uint8_t modifier2;

	struct corerouter_peer *prev;
	struct corerouter_peer *next;

	int current_timeout;

	ssize_t (*flush)(struct corerouter_peer *);

	int is_flushing;
	int is_buffering;
        int buffering_fd;
};

struct uwsgi_corerouter {

	char *name;
	char *short_name;
	size_t session_size;

	int (*alloc_session)(struct uwsgi_corerouter *, struct uwsgi_gateway_socket *, struct corerouter_session *, struct sockaddr *, socklen_t);
	int (*mapper)(struct uwsgi_corerouter *, struct corerouter_peer *);

        int has_sockets;
	int has_backends;
        int has_subscription_sockets;

        int processes;
        int quiet;

        struct uwsgi_rbtree *timeouts;

        char *use_cache;
	struct uwsgi_cache *cache;
        int nevents;

	int max_retries;

	char *magic_table[256];

        int queue;

        char *pattern;
        int pattern_len;

        char *base;
        int base_len;

        size_t post_buffering;
        char *pb_base_dir;

        struct uwsgi_string_list *static_nodes;
        struct uwsgi_string_list *current_static_node;
        int static_node_gracetime;

        char *stats_server;
        int cr_stats_server;

        int use_socket;
        int socket_num;
        struct uwsgi_socket *to_socket;

        struct uwsgi_subscribe_slot **subscriptions;

        struct uwsgi_string_list *fallback;

        int socket_timeout;

        uint8_t code_string_modifier1;
        char *code_string_code;
        char *code_string_function;

        struct uwsgi_rb_timer *subscriptions_check;

        int cheap;
        int i_am_cheap;

        int tolerance;
        int harakiri;

        struct corerouter_peer **cr_table;

	int interesting_fd;

	uint64_t active_sessions;

	uid_t uid;
	gid_t gid;

        struct uwsgi_string_list *resubscribe;
        char *resubscribe_bind;

	size_t buffer_size;
	int fallback_on_no_key;

	char *fallback_key;
	int fallback_key_len;
};

// a session is started when a client connect to the router
struct corerouter_session {

	// corerouter related to this session
	struct uwsgi_corerouter *corerouter;
	// gateway socket related to this session
	struct uwsgi_gateway_socket *ugs;

	// the list of fallback instances
        struct uwsgi_string_list *fallback;

	// store the client address
	struct sockaddr_un addr;
        socklen_t addr_len;

	void (*close)(struct corerouter_session *);
	int (*retry)(struct corerouter_peer *);

	// leave the main peer alive
	int can_keepalive;
	// destroy the main peer after the last full write
	int wait_full_write;

	// this is the peer of the client
	struct corerouter_peer *main_peer;
	// this is the linked list of backends
	struct corerouter_peer *peers;

	// connect after the next successfull write
	struct corerouter_peer *connect_peer_after_write;

	union uwsgi_sockaddr client_sockaddr;
#ifdef AF_INET6
	char client_address[INET6_ADDRSTRLEN];
#else
	char client_address[INET_ADDRSTRLEN];
#endif

	// use 11 bytes to be snprintf friendly
	char client_port[11];
};

void uwsgi_opt_corerouter(char *, char *, void *);
void uwsgi_opt_undeferred_corerouter(char *, char *, void *);
void uwsgi_opt_corerouter_use_socket(char *, char *, void *);
void uwsgi_opt_corerouter_use_base(char *, char *, void *);
void uwsgi_opt_corerouter_use_pattern(char *, char *, void *);
void uwsgi_opt_corerouter_zerg(char *, char *, void *);
void uwsgi_opt_corerouter_cs(char *, char *, void *);
void uwsgi_opt_corerouter_ss(char *, char *, void *);
void uwsgi_opt_corerouter_fallback_key(char *, char *, void *);


void corerouter_manage_subscription(char *, uint16_t, char *, uint16_t, void *);

void *uwsgi_corerouter_setup_event_queue(struct uwsgi_corerouter *, int);
void uwsgi_corerouter_manage_subscription(struct uwsgi_corerouter *, int id, struct uwsgi_gateway_socket *);
void uwsgi_corerouter_manage_internal_subscription(struct uwsgi_corerouter *, int);
void uwsgi_corerouter_setup_sockets(struct uwsgi_corerouter *);

int uwsgi_corerouter_init(struct uwsgi_corerouter *);

struct corerouter_session *corerouter_alloc_session(struct uwsgi_corerouter *, struct uwsgi_gateway_socket *, int, struct sockaddr *, socklen_t);
void corerouter_close_session(struct uwsgi_corerouter *, struct corerouter_session *);

int uwsgi_cr_map_use_void(struct uwsgi_corerouter *, struct corerouter_peer *);
int uwsgi_cr_map_use_cache(struct uwsgi_corerouter *, struct corerouter_peer *);
int uwsgi_cr_map_use_pattern(struct uwsgi_corerouter *, struct corerouter_peer *);
int uwsgi_cr_map_use_cluster(struct uwsgi_corerouter *, struct corerouter_peer *);
int uwsgi_cr_map_use_subscription(struct uwsgi_corerouter *, struct corerouter_peer *);
int uwsgi_cr_map_use_subscription_dotsplit(struct uwsgi_corerouter *, struct corerouter_peer *);
int uwsgi_cr_map_use_base(struct uwsgi_corerouter *, struct corerouter_peer *);
int uwsgi_cr_map_use_cs(struct uwsgi_corerouter *, struct corerouter_peer *);
int uwsgi_cr_map_use_to(struct uwsgi_corerouter *, struct corerouter_peer *);
int uwsgi_cr_map_use_static_nodes(struct uwsgi_corerouter *, struct corerouter_peer *);

int uwsgi_corerouter_has_backends(struct uwsgi_corerouter *);

int uwsgi_cr_set_hooks(struct corerouter_peer *, ssize_t (*)(struct corerouter_peer *), ssize_t (*)(struct corerouter_peer *));
struct corerouter_peer *uwsgi_cr_peer_add(struct corerouter_session *);
struct corerouter_peer *uwsgi_cr_peer_find_by_sid(struct corerouter_session *, uint32_t);
void corerouter_close_peer(struct uwsgi_corerouter *, struct corerouter_peer *);
struct uwsgi_rb_timer *corerouter_reset_timeout(struct uwsgi_corerouter *, struct corerouter_peer *);