File: rpc.c

package info (click to toggle)
uwsgi 2.0.31-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,564 kB
  • sloc: ansic: 87,066; python: 7,004; cpp: 1,133; java: 708; perl: 678; sh: 585; ruby: 555; makefile: 148; xml: 130; cs: 121; objc: 37; php: 28; erlang: 20; javascript: 11
file content (222 lines) | stat: -rw-r--r-- 5,359 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
#include "uwsgi.h"

extern struct uwsgi_server uwsgi;

int uwsgi_register_rpc(char *name, struct uwsgi_plugin *plugin, uint8_t args, void *func) {

	struct uwsgi_rpc *urpc;
	int ret = -1;

	if (uwsgi.mywid == 0 && uwsgi.workers[0].pid != uwsgi.mypid) {
		uwsgi_log("only the master and the workers can register RPC functions\n");
		return -1;
	}

	if (strlen(name) >= UMAX8)  {
	      uwsgi_log("the supplied RPC name string is too long, max size is %d\n", UMAX8-1);
	      return -1;
	}

	uwsgi_lock(uwsgi.rpc_table_lock);

	// first check if a function is already registered
	size_t i;
	for(i=0;i<uwsgi.shared->rpc_count[uwsgi.mywid];i++) {
		int pos = (uwsgi.mywid * uwsgi.rpc_max) + i;
		urpc = &uwsgi.rpc_table[pos];
		if (!strcmp(name, urpc->name)) {
			goto already;
		}
	}

	if (uwsgi.shared->rpc_count[uwsgi.mywid] < uwsgi.rpc_max) {
		int pos = (uwsgi.mywid * uwsgi.rpc_max) + uwsgi.shared->rpc_count[uwsgi.mywid];
		urpc = &uwsgi.rpc_table[pos];
		uwsgi.shared->rpc_count[uwsgi.mywid]++;
already:
		memcpy(urpc->name, name, strlen(name));
		urpc->plugin = plugin;
		urpc->args = args;
		urpc->func = func;
		urpc->shared = uwsgi.mywid == 0 ? 1 : 0;

		ret = 0;
		if (uwsgi.mywid == 0) {
			uwsgi_log("registered shared/inherited RPC function \"%s\"\n", name);
		}
		else {
			uwsgi_log("registered RPC function \"%s\" on worker %d\n", name, uwsgi.mywid);
		}
	}

	// implement cow
	if (uwsgi.mywid == 0) {
		int i;
		for(i=1;i<=uwsgi.numproc;i++) {
			uwsgi.shared->rpc_count[i] = uwsgi.shared->rpc_count[0];
			int pos = (i * uwsgi.rpc_max);
			memcpy(&uwsgi.rpc_table[pos], uwsgi.rpc_table, sizeof(struct uwsgi_rpc) * uwsgi.rpc_max);
		}
	}

	uwsgi_unlock(uwsgi.rpc_table_lock);

	return ret;
}

uint64_t uwsgi_rpc(char *name, uint8_t argc, char *argv[], uint16_t argvs[], char **output) {

	struct uwsgi_rpc *urpc = NULL;
	uint64_t i;
	uint64_t ret = 0;

	int pos = (uwsgi.mywid * uwsgi.rpc_max);

	for (i = 0; i < uwsgi.shared->rpc_count[uwsgi.mywid]; i++) {
		if (uwsgi.rpc_table[pos + i].name[0] != 0) {
			if (!strcmp(uwsgi.rpc_table[pos + i].name, name)) {
				urpc = &uwsgi.rpc_table[pos + i];
				break;
			}
		}
	}

	if (urpc) {
		if (urpc->plugin->rpc) {
			ret = urpc->plugin->rpc(urpc->func, argc, argv, argvs, output);
		}
	}

	return ret;
}

static void rpc_context_hook(char *key, uint16_t kl, char *value, uint16_t vl, void *data) {
	size_t *r = (size_t *) data;

	if (!uwsgi_strncmp(key, kl, "CONTENT_LENGTH", 14)) {
		*r = uwsgi_str_num(value, vl);
	}
}

char *uwsgi_do_rpc(char *node, char *func, uint8_t argc, char *argv[], uint16_t argvs[], uint64_t * len) {

	uint8_t i;
	uint16_t ulen;
	struct uwsgi_header *uh = NULL;
	char *buffer = NULL;

	*len = 0;

	if (node == NULL || !strcmp(node, "")) {
		// allocate the whole buffer
		if (!uwsgi.rpc_table) {
                	uwsgi_log("local rpc subsystem is still not initialized !!!\n");
                	return NULL;
        	}
		*len = uwsgi_rpc(func, argc, argv, argvs, &buffer);
		if (buffer)
			return buffer;
		return NULL;
	}


	// connect to node (async way)
	int fd = uwsgi_connect(node, 0, 1);
	if (fd < 0)
		return NULL;

	// wait for connection;
	int ret = uwsgi.wait_write_hook(fd, uwsgi.socket_timeout);
	if (ret <= 0) {
		close(fd);
		return NULL;
	}

	// prepare a uwsgi array
	size_t buffer_size = 2 + strlen(func);

	for (i = 0; i < argc; i++) {
		buffer_size += 2 + argvs[i];
	}

	if (buffer_size > 0xffff) {
		uwsgi_log("RPC packet length overflow!!! Must be less than or equal to 65535, have %llu\n", buffer_size);
		return NULL;
	}

	// allocate the whole buffer
	buffer = uwsgi_malloc(4+buffer_size);

	// set the uwsgi header
	uh = (struct uwsgi_header *) buffer;
	uh->modifier1 = 173;
	uh->pktsize = (uint16_t) buffer_size;
	uh->modifier2 = 0;

	// add func to the array
	char *bufptr = buffer + 4;
	ulen = strlen(func);
	*bufptr++ = (uint8_t) (ulen & 0xff);
	*bufptr++ = (uint8_t) ((ulen >> 8) & 0xff);
	memcpy(bufptr, func, ulen);
	bufptr += ulen;

	for (i = 0; i < argc; i++) {
		ulen = argvs[i];
		*bufptr++ = (uint8_t) (ulen & 0xff);
		*bufptr++ = (uint8_t) ((ulen >> 8) & 0xff);
		memcpy(bufptr, argv[i], ulen);
		bufptr += ulen;
	}

	// ok the request is ready, let's send it in non blocking way
	if (uwsgi_write_true_nb(fd, buffer, buffer_size+4, uwsgi.socket_timeout)) {
		goto error;
	}

	// ok time to wait for the response in non blocking way
	size_t rlen = buffer_size+4;
	uint8_t modifier2 = 0;
	if (uwsgi_read_with_realloc(fd, &buffer, &rlen, uwsgi.socket_timeout, NULL, &modifier2)) {
		goto error;
	}

	// 64bit response ?
	if (modifier2 == 5) {
		size_t content_len = 0;
		if (uwsgi_hooked_parse(buffer, rlen, rpc_context_hook, &content_len )) goto error;

		if (content_len > rlen) {
			char *tmp_buf = realloc(buffer, content_len);
			if (!tmp_buf) goto error;
			buffer = tmp_buf;
		}

		rlen = content_len;

		// read the raw value from the socket
                if (uwsgi_read_whole_true_nb(fd, buffer, rlen, uwsgi.socket_timeout)) {
			goto error;
                }
	}

	close(fd);
	*len = rlen;
	if (*len == 0) {
		goto error2;
	}
	return buffer;

error:
	close(fd);
error2:
	free(buffer);
	return NULL;

}


void uwsgi_rpc_init() {
	uwsgi.rpc_table = uwsgi_calloc_shared((sizeof(struct uwsgi_rpc) * uwsgi.rpc_max) * (uwsgi.numproc+1));
	uwsgi.shared->rpc_count = uwsgi_calloc_shared(sizeof(uint64_t) * (uwsgi.numproc+1));
}