File: example_server.cpp

package info (click to toggle)
r-cran-rinside 0.2.19-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 668 kB
  • sloc: cpp: 3,310; ansic: 117; xml: 57; ruby: 34; makefile: 2
file content (312 lines) | stat: -rw-r--r-- 8,580 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
/*
 * Copyright (c) 2014 Christian Authmann
 */

#include "common/binarystream.h"
#include "common/constants.h"
#include "datatypes/foo.h"
#include "datatypes/bar.h"

#include <cstdlib>
#include <cstdio>
#include <string.h> // memset()
#include <map>
#include <atomic>
#include <iostream>
#include <fstream>
#include <stdexcept>

#include <chrono> // for sleeping
#include <thread>

#include <time.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <poll.h>
#include <signal.h>

/*
 * This is an example server. It sets up an R environment and a listening socket, then waits
 * for clients and fork()s.
 * The actual communication with clients is handled by server/rinsideserver.cpp.
 * This file only does initialization and process tracking.
 */

/*
 * Since the server fork()s a lot, we would like to prepend the pid to each logged line
 */
#define LOG(...) {fprintf(stderr, "%d: ", getpid());fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");}

/*
 * If an R script gets stuck in an infinite loop, we need to stop it eventually. We thus define a global
 * timeout after which it gets killed.
 *
 * It would be desirable to allow the client to specify the timeout. Unfortunately, only the fork()ed
 * child process can communicate with the client, but the parent process needs to know about the timeout,
 * making this more complicated than one might expect.
 */
const int TIMEOUT_SECONDS = 600;

#include <RcppCommon.h>

#include "datatypes/foo_rcpp_wrapper_declarations.h"
#include "datatypes/bar_rcpp_wrapper_declarations.h"

#include <Rcpp.h>
#include <RInside.h>
#if !defined(RINSIDE_CALLBACKS)
#error "RInside was not compiled with RINSIDE_CALLBACKS"
#endif

#include "datatypes/foo_rcpp_wrapper_definitions.h"
#include "datatypes/bar_rcpp_wrapper_definitions.h"


/*
 * The RInsideServer must be included AFTER RInside and all wrappers are included
 */
#include "server/rinsideserver.h"


int cmpTimespec(const struct timespec &t1, const struct timespec &t2) {
	if (t1.tv_sec < t2.tv_sec)
		return -1;
	if (t1.tv_sec > t2.tv_sec)
		return 1;
	if (t1.tv_nsec < t2.tv_nsec)
		return -1;
	if (t1.tv_nsec > t2.tv_nsec)
		return 1;
	return 0;
}


void signal_handler(int signum) {
	LOG("Caught signal %d, exiting", signum);
	exit(signum);
}


int main()
{
	// register our custom types with the server
	RInsideServer::registerDefaultTypes();
	RInsideServer::registerType<Foo>();
	RInsideServer::registerType<Bar>();


	// Install signal handlers
	int signals[] = {SIGHUP, SIGINT, 0};
	for (int i=0;signals[i];i++) {
		if (signal(signals[i], signal_handler) == SIG_ERR) {
			perror("Cannot install signal handler");
			exit(1);
		}
		else
			printf("Signal handler for %d installed\n", signals[i]);
	}
	signal(SIGPIPE, SIG_IGN);

	/*
	 * If R prints anything to the console, we must catch it.
	 * Instead of redirecting stdout (which we might want to use for diagnostics or logging), we
	 * use RInside's callbacks. They're marked experimental and aren't enabled by default, but in our
	 * tests, they worked just fine.
	 */
	RInsideCallbacks *Rcallbacks = new RInsideCallbacks();
	// Initialize R environment
	printf("...loading R\n");
	RInside R;
	R.set_callbacks( Rcallbacks );

	printf("...loading packages\n");
	try {
		/*
		 * Loading packages is slow. We want to load all common packages once on
		 * server startup, before the fork()
		 *
		 * For example, sandboxR might be useful to restrict the damage an R script can do.
		 * See https://github.com/rapporter/sandboxR
		 */
		//R.parseEval("library(\"sandboxR\")");
	}
	catch (const std::exception &e) {
		printf("error loading packages: %s\nR's output:\n%s", e.what(), Rcallbacks->getConsoleOutput().c_str());
		exit(5);
	}
	Rcallbacks->resetConsoleOutput();

	printf("R is ready\n");

	// get rid of leftover sockets
	unlink(ris_socket_address);

	// create a fresh socket
	int listen_fd = socket(AF_UNIX, SOCK_STREAM, 0);
	if (listen_fd < 0) {
		perror("socket() failed");
		exit(1);
	}

	// bind socket
	struct sockaddr_un server_addr;
	memset((void *) &server_addr, 0, sizeof(server_addr));
	server_addr.sun_family = AF_UNIX;
	strcpy(server_addr.sun_path, ris_socket_address);
	if (bind(listen_fd, (sockaddr *) &server_addr, sizeof(server_addr)) < 0) {
		 perror("bind() failed");
		 exit(1);
	}

	// adjust this for your own needs..
	chmod(ris_socket_address, 0777);


	/*
	 * We need to keep track of all the children to enforce timeouts. This map
	 * contains pids of all child processes and their end times.
	 */
	std::map<pid_t, timespec> running_clients;

	printf("Socket started, listening..\n");
	// Start listening and fork()
	listen(listen_fd, 5);
	while (true) {
		/*
		 * Try to reap all child processes that exited on their own. Not only
		 * will this keep our running_clients map small, it will also allow the
		 * OS to remove any "zombie" processes.
		 */
		int status;
		pid_t exited_pid;
		while ((exited_pid = waitpid(-1, &status, WNOHANG)) > 0) {
			LOG("Client %d no longer exists", (int) exited_pid);
			running_clients.erase(exited_pid);
		}
		/*
		 * Now check if any children exceeded their timeout. Kill them.
		 */
		struct timespec current_t;
		clock_gettime(CLOCK_MONOTONIC, &current_t);

		for (auto it = running_clients.begin(); it != running_clients.end(); ) {
			auto timeout_t = it->second;
			if (cmpTimespec(timeout_t, current_t) < 0) {
				auto timeouted_pid = it->first;
				LOG("Client %d gets killed due to timeout", (int) timeouted_pid);

				/*
				 * We kill the client using SIGHUP. Since we installed a signal handler, and signal handlers
				 * are kept during fork(), this should be enough to end it.
				 * That is, unless an R package removes the signal handler. In that case, we'd need to keep
				 * tracking the process and force a SIGKILL if it refuses to exit.
				 */
				if (kill(timeouted_pid, SIGHUP) < 0) {
					perror("kill() failed");
					++it;
				}
				else {
					// the postincrement of the iterator is important to avoid using an invalid iterator
					running_clients.erase(it++);
				}
			}
			else {
				++it;
			}

		}

		/*
		 * Wait for new connections.
		 *
		 * We may want to limit the amount of clients running at the same time.
		 */
		if (running_clients.size() > 10) {
			std::this_thread::sleep_for(std::chrono::milliseconds(5000));
			continue;
		}

		struct pollfd pollfds[1];
		pollfds[0].fd = listen_fd;
		pollfds[0].events = POLLIN;

		int poll_res = poll(pollfds, /* count = */ 1, /* timeout in ms = */ 5000);
		if (poll_res < 0) {
			perror("poll() failed");
			exit(1);
		}
		/*
		 * If no new connection is made within 5 seconds, we repeat the loop and check
		 * for finished or timeouted children again.
		 */
		if (poll_res == 0)
			continue;
		if ((pollfds[0].revents & POLLIN) == 0)
			continue;

		struct sockaddr_un client_addr;
		socklen_t client_addr_len = sizeof(client_addr);
		int client_fd = accept(listen_fd, (struct sockaddr *) &client_addr, &client_addr_len);
		if (client_fd < 0) {
			if (errno == EAGAIN || errno == EWOULDBLOCK)
				continue;
			perror("accept() failed");
			exit(1);
		}
		// fork
		pid_t pid = fork();
		if (pid < 0) {
			perror("fork() failed");
			exit(1);
		}

		if (pid == 0) {
			/*
			 * This is the child process.
			 *
			 * If the child process needs to drop any privileges the server may have had,
			 * this is an excellent time to do so.
			 * Whether it's a chroot, seccomp-bpf or a MAC framework like SELinux or AppArmor.
			 *
			 * Note that neither is an excuse to run the parent process unrestricted; creating
			 * a new restricted user for the server seems wise.
			 */
			close(listen_fd);
			LOG("Client starting");
			auto start_c = clock();
			struct timespec start_t;
			clock_gettime(CLOCK_MONOTONIC, &start_t);
			try {
				BinaryStream stream(client_fd, client_fd);
				RInsideServer ris(stream, R, *Rcallbacks);
				ris.run();
			}
			catch (const std::exception &e) {
				LOG("Exception: %s", e.what());
			}
			auto end_c = clock();
			struct timespec end_t;
			clock_gettime(CLOCK_MONOTONIC, &end_t);

			double c = (double) (end_c - start_c) / CLOCKS_PER_SEC;
			double t = (double) (end_t.tv_sec - start_t.tv_sec) + (double) (end_t.tv_nsec - start_t.tv_nsec) / 1000000000;

			LOG("Client finished, %.3fs real, %.3fs CPU", t, c);

			exit(0);
		}
		else {
			// This is the parent process
			close(client_fd);

			struct timespec timeout_t;
			clock_gettime(CLOCK_MONOTONIC, &timeout_t);
			timeout_t.tv_sec += TIMEOUT_SECONDS;
			running_clients[pid] = timeout_t;
		}
	}
}