File: MessageServer.h

package info (click to toggle)
ruby-passenger 3.0.13debian-1%2Bdeb7u2
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 15,920 kB
  • sloc: cpp: 99,104; ruby: 18,098; ansic: 9,846; sh: 8,632; python: 141; makefile: 30
file content (581 lines) | stat: -rw-r--r-- 20,045 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
/*
 *  Phusion Passenger - http://www.modrails.com/
 *  Copyright (c) 2010 Phusion
 *
 *  "Phusion Passenger" is a trademark of Hongli Lai & Ninh Bui.
 *
 *  Permission is hereby granted, free of charge, to any person obtaining a copy
 *  of this software and associated documentation files (the "Software"), to deal
 *  in the Software without restriction, including without limitation the rights
 *  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 *  copies of the Software, and to permit persons to whom the Software is
 *  furnished to do so, subject to the following conditions:
 *
 *  The above copyright notice and this permission notice shall be included in
 *  all copies or substantial portions of the Software.
 *
 *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 *  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 *  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 *  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 *  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 *  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 *  THE SOFTWARE.
 */
#ifndef _PASSENGER_MESSAGE_SERVER_H_
#define _PASSENGER_MESSAGE_SERVER_H_

#include <string>
#include <vector>

#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <oxt/system_calls.hpp>
#include <oxt/dynamic_thread_group.hpp>

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <unistd.h>
#include <cerrno>
#include <cassert>

#include "Account.h"
#include "AccountsDatabase.h"
#include "Constants.h"
#include "FileDescriptor.h"
#include "MessageChannel.h"
#include "Logging.h"
#include "Exceptions.h"
#include "Utils/StrIntUtils.h"
#include "Utils/IOUtils.h"

namespace Passenger {

using namespace std;
using namespace boost;
using namespace oxt;


/* This source file follows the security guidelines written in Account.h. */

/**
 * Simple pluggable request/response messaging server framework.
 *
 * MessageServer implements a server with the following properties:
 * - It listens on a Unix socket. Socket creation and destruction is automatically handled.
 *   The socket is world-writable because a username/password authentication scheme is
 *   used to enforce security.
 * - Multithreaded: 1 thread per client.
 * - Designed for simple request/response cycles. That is, a client sends a request, and
 *   the server may respond with arbitrary data. The server does not respond sporadically,
 *   i.e. it only responds after a request.
 * - Requests are MessageIO array messages.
 * - Connections are authenticated. Connecting clients must send a username and password,
 *   which are then checked against an accounts database. The associated account is known
 *   throughout the entire connection life time so that it's possible to implement
 *   authorization features.
 *
 * MessageServer does not process messages by itself. Instead, one registers handlers
 * which handle message processing. This framework allows one to seperate message
 * handling code by function, while allowing everything to listen on the same socket and
 * to use a common request parsing and dispatching codebase.
 *
 * A username/password authentication scheme was chosen over a file permission scheme because
 * experience has shown that the latter is inadequate. For example, the web server may
 * consist of multiple worker processes, each running as a different user. Although ACLs
 * can solve this problem as well, not every platform supports ACLs by default.
 *
 * <h2>Writing handlers</h2>
 * Handlers must inherit from MessageServer::Handler. They may implement newClient()
 * and must implement processMessage().
 *
 * When a new client is accepted, MessageServer will call newClient() on all handlers.
 * This method accepts one argument: a common client context object. This context object
 * contains client-specific information, such as its file descriptor. It cannot be
 * extended to store more information, but it is passed to every handler anyway,
 * hence the word "common" in its name.
 * newClient() is supposed to return a handler-specific client context object for storing
 * its own information, or a null pointer if it doesn't need to store anything.
 *
 * When a client sends a request, MessageServer iterates through all handlers and calls
 * processMessage() on each one, passing it the common client context and the
 * handler-specific client context. processMessage() may return either true or false;
 * true indicates that the handler processed the message, false indicates that
 * it did not. Iteration stops at the first handler that returns true.
 * If all handlers return false, i.e. the client sent a message that no handler recognizes,
 * then MessageServer will close the connection with the client.
 *
 * Handlers do not need to be thread-safe as long as they only operate on data in the
 * context objects. MessageServer ensures that context objects are not shared with other
 * threads.
 *
 * <h2>Usage example</h2>
 * This implements a simple ping server. Every time a "ping" request is sent, the
 * server responds with "pong" along with the number of times it had already sent
 * pong to the same client in the past.
 *
 * @code
 *   class PingHandler: public MessageServer::Handler {
 *   public:
 *       struct MyContext: public MessageServer::ClientContext {
 *           int count;
 *           
 *           MyContext() {
 *               count = 0;
 *           }
 *       };
 *       
 *       MessageServer::ClientContextPtr newClient(MessageServer::CommonClientContext &commonContext) {
 *           return MessageServer::ClientContextPtr(new MyContext());
 *       }
 *       
 *       bool processMessage(MessageServer::CommonClientContext &commonContext,
 *                           MessageServer::ClientContextPtr &specificContext,
 *                           const vector<string> &args)
 *       {
 *           if (args[0] == "ping") {
 *               MyContext *myContext = (MyContext *) specificContext.get();
 *               writeArrayMessage(commonContext.fd, "pong", toString(specificContext->count).c_str(), NULL);
 *               specificContext->count++;
 *               return true;
 *           } else {
 *               return false;
 *           }
 *       }
 *   };
 *   
 *   ...
 *   
 *   MessageServer server("server.sock");
 *   server.addHandler(MessageServer::HandlerPtr(new PingHandler()));
 *   server.addHandler(MessageServer::HandlerPtr(new PingHandler()));
 *   server.mainLoop();
 * @endcode
 *
 * @ingroup Support
 */
class MessageServer {
public:
	static const unsigned int CLIENT_THREAD_STACK_SIZE =
		#ifdef __FreeBSD__
			// localtime() on FreeBSD needs some more stack space.
			1024 * 96;
		#else
			1024 * 64;
		#endif
	
	/** Interface for client context objects. */
	class ClientContext {
	public:
		virtual ~ClientContext() { }
	};
	
	typedef shared_ptr<ClientContext> ClientContextPtr;
	
	/**
	 * A common client context, containing client-specific information
	 * used by MessageServer itself.
	 */
	class CommonClientContext: public ClientContext {
	public:
		/** The client's socket file descriptor. */
		FileDescriptor fd;
		
		/** The channel that's associated with the client's socket. */
		MessageChannel channel;
		
		/** The account with which the client authenticated. */
		AccountPtr account;
		
		
		CommonClientContext(FileDescriptor &theFd, AccountPtr &theAccount)
			: fd(theFd), channel(theFd), account(theAccount)
		{ }
		
		/** Returns a string representation for this client context. */
		string name() {
			return toString(fd);
		}
		
		/**
		 * Checks whether this client has all of the rights in <tt>rights</tt>. The
		 * client will be notified about the result of this check, by sending it a
		 * message.
		 *
		 * @throws SecurityException The client doesn't have one of the required rights.
		 * @throws SystemException Something went wrong while communicating with the client.
		 * @throws boost::thread_interrupted
		 */
		void requireRights(Account::Rights rights) {
			if (!account->hasRights(rights)) {
				P_TRACE(2, "Security error: insufficient rights to execute this command.");
				writeArrayMessage(fd, "SecurityException", "Insufficient rights to execute this command.", NULL);
				throw SecurityException("Insufficient rights to execute this command.");
			} else {
				writeArrayMessage(fd, "Passed security", NULL);
			}
		}
	};
	
	/**
	 * An abstract message handler class.
	 *
	 * The methods defined in this class are allowed to throw arbitrary exceptions.
	 * Such exceptions are caught and logged, after which the connection to the
	 * client is closed.
	 */
	class Handler {
	public:
		virtual ~Handler() { }
		
		/**
		 * Called when a new client has connected to the MessageServer.
		 *
		 * This method is called after the client has authenticated itself.
		 *
		 * @param context Contains common client-specific information.
		 * @return A client context object for storing handler-specific client
		 *         information, or null. The default implementation returns null.
		 */
		virtual ClientContextPtr newClient(CommonClientContext &context) {
			return ClientContextPtr();
		}
		
		/**
		 * Called when a client has disconnected from the MessageServer. The
		 * default implementation does nothing.
		 *
		 * This method is called even if processMessage() throws an exception.
		 * It is however not called if newClient() throws an exception.
		 *
		 * @param commonContext Contains common client-specific information.
		 * @param handlerSpecificContext The client context object as was returned
		 *     earlier by newClient().
		 */
		virtual void clientDisconnected(MessageServer::CommonClientContext &context,
		                                MessageServer::ClientContextPtr &handlerSpecificContext)
		{ }
		
		/**
		 * Called then a client has sent a request message.
		 *
		 * This method is called after newClient() is called.
		 *
		 * @param commonContext Contains common client-specific information.
		 * @param handlerSpecificContext The client context object as was returned
		 *     earlier by newClient().
		 * @param args The request message's contents.
		 * @return Whether this handler has processed the message. Return false
		 *         if the message is unrecognized.
		 */
		virtual bool processMessage(CommonClientContext &commonContext,
		                            ClientContextPtr &handlerSpecificContext,
		                            const vector<string> &args) = 0;
	};
	
	typedef shared_ptr<Handler> HandlerPtr;
	
protected:
	/** The filename of the server socket on which this MessageServer is listening. */
	string socketFilename;
	
	/** An accounts database, used for authenticating clients. */
	AccountsDatabasePtr accountsDatabase;
	
	/** The registered message handlers. */
	vector<HandlerPtr> handlers;
	
	/** The maximum number of milliseconds that client may spend on logging in.
	 * Clients that take longer are disconnected.
	 *
	 * @invariant loginTimeout != 0
	 */
	unsigned long long loginTimeout;
	
	/** The client threads. */
	dynamic_thread_group threadGroup;
	
	/** The server socket's file descriptor.
	 * @invariant serverFd >= 0
	 */
	int serverFd;
	
	
	/** Calls clientDisconnected() on all handlers when destroyed. */
	struct DisconnectEventBroadcastGuard {
		vector<HandlerPtr> &handlers;
		CommonClientContext &commonContext;
		vector<ClientContextPtr> &handlerSpecificContexts;
		
		DisconnectEventBroadcastGuard(vector<HandlerPtr> &_handlers,
		                              CommonClientContext &_commonContext,
		                              vector<ClientContextPtr> &_handlerSpecificContexts)
		: handlers(_handlers),
		  commonContext(_commonContext),
		  handlerSpecificContexts(_handlerSpecificContexts)
		{ }
		
		~DisconnectEventBroadcastGuard() {
			vector<HandlerPtr>::iterator handler_iter;
			vector<ClientContextPtr>::iterator context_iter;

			for (handler_iter = handlers.begin(), context_iter = handlerSpecificContexts.begin();
			     handler_iter != handlers.end();
			     handler_iter++, context_iter++) {
				(*handler_iter)->clientDisconnected(commonContext, *context_iter);
			}
		}
	};
	
	
	/**
	 * Create a server socket and set it up for listening. This socket will
	 * be world-writable.
	 *
	 * @throws RuntimeException
	 * @throws SystemException
	 * @throws boost::thread_interrupted
	 */
	void startListening() {
		TRACE_POINT();
		int ret;
		
		serverFd = createUnixServer(socketFilename.c_str());
		do {
			ret = chmod(socketFilename.c_str(),
				S_ISVTX |
				S_IRUSR | S_IWUSR | S_IXUSR |
				S_IRGRP | S_IWGRP | S_IXGRP |
				S_IROTH | S_IWOTH | S_IXOTH);
		} while (ret == -1 && errno == EINTR);
	}
	
	/**
	 * Authenticate the given client and returns its account information.
	 *
	 * @return A smart pointer to an Account object, or NULL if authentication failed.
	 */
	AccountPtr authenticate(const FileDescriptor &client) {
		string username, password;
		MemZeroGuard passwordGuard(password);
		unsigned long long timeout = loginTimeout;
		
		try {
			writeArrayMessage(client, &timeout, "version", "1", NULL);
			
			try {
				if (!readScalarMessage(client, username, MESSAGE_SERVER_MAX_USERNAME_SIZE, &timeout)) {
					return AccountPtr();
				}
			} catch (const SecurityException &) {
				writeArrayMessage(client, &timeout, "The supplied username is too long.", NULL);
				return AccountPtr();
			}
			
			try {
				if (!readScalarMessage(client, password, MESSAGE_SERVER_MAX_PASSWORD_SIZE, &timeout)) {
					return AccountPtr();
				}
			} catch (const SecurityException &) {
				writeArrayMessage(client, &timeout, "The supplied password is too long.", NULL);
				return AccountPtr();
			}
			
			AccountPtr account = accountsDatabase->authenticate(username, password);
			passwordGuard.zeroNow();
			if (account == NULL) {
				writeArrayMessage(client, &timeout, "Invalid username or password.", NULL);
				return AccountPtr();
			} else {
				writeArrayMessage(client, &timeout, "ok", NULL);
				return account;
			}
		} catch (const SystemException &) {
			return AccountPtr();
		} catch (const TimeoutException &) {
			P_WARN("Login timeout");
			return AccountPtr();
		}
	}
	
	void broadcastNewClientEvent(CommonClientContext &context,
	                             vector<ClientContextPtr> &handlerSpecificContexts) {
		vector<HandlerPtr>::iterator it;
		
		for (it = handlers.begin(); it != handlers.end(); it++) {
			handlerSpecificContexts.push_back((*it)->newClient(context));
		}
	}
	
	bool processMessage(CommonClientContext &commonContext,
	                    vector<ClientContextPtr> &handlerSpecificContexts,
	                    const vector<string> &args) {
		vector<HandlerPtr>::iterator handler_iter;
		vector<ClientContextPtr>::iterator context_iter;
		
		for (handler_iter = handlers.begin(), context_iter = handlerSpecificContexts.begin();
		     handler_iter != handlers.end();
		     handler_iter++, context_iter++) {
			if ((*handler_iter)->processMessage(commonContext, *context_iter, args)) {
				return true;
			}
		}
		return false;
	}
	
	void processUnknownMessage(CommonClientContext &commonContext, const vector<string> &args) {
		TRACE_POINT();
		string name;
		if (args.empty()) {
			name = "(null)";
		} else {
			name = args[0];
		}
		P_TRACE(2, "A MessageServer client sent an invalid command: "
			<< name << " (" << args.size() << " elements)");
	}
	
	/**
	 * The main function for a thread which handles a client.
	 */
	void clientHandlingMainLoop(FileDescriptor client) {
		TRACE_POINT();
		vector<string> args;
		
		P_TRACE(4, "MessageServer client thread " << (int) client << " started.");
		
		try {
			AccountPtr account(authenticate(client));
			if (account == NULL) {
				P_TRACE(4, "MessageServer client thread " << (int) client << " exited.");
				return;
			}
			
			CommonClientContext commonContext(client, account);
			vector<ClientContextPtr> handlerSpecificContexts;
			broadcastNewClientEvent(commonContext, handlerSpecificContexts);
			DisconnectEventBroadcastGuard dguard(handlers, commonContext, handlerSpecificContexts);
			
			while (!this_thread::interruption_requested()) {
				UPDATE_TRACE_POINT();
				if (!readArrayMessage(commonContext.fd, args)) {
					// Client closed connection.
					break;
				}
				
				P_TRACE(4, "MessageServer client " << commonContext.name() <<
					": received message: " << toString(args));
				
				UPDATE_TRACE_POINT();
				if (!processMessage(commonContext, handlerSpecificContexts, args)) {
					processUnknownMessage(commonContext, args);
					break;
				}
				args.clear();
			}
			
			P_TRACE(4, "MessageServer client thread " << (int) client << " exited.");
			client.close();
		} catch (const boost::thread_interrupted &) {
			P_TRACE(2, "MessageServer client thread " << (int) client << " interrupted.");
		} catch (const tracable_exception &e) {
			P_TRACE(2, "An error occurred in a MessageServer client thread " << (int) client << ":\n"
				<< "   message: " << toString(args) << "\n"
				<< "   exception: " << e.what() << "\n"
				<< "   backtrace:\n" << e.backtrace());
		}
	}
	
public:
	/**
	 * Creates a new MessageServer object.
	 * The actual server main loop is not started until you call mainLoop().
	 *
	 * @param socketFilename The socket filename on which this MessageServer
	 *                       should be listening.
	 * @param accountsDatabase An accounts database for this server, used for
	 *                         authenticating clients.
	 * @throws RuntimeException Something went wrong while setting up the server socket.
	 * @throws SystemException Something went wrong while setting up the server socket.
	 * @throws boost::thread_interrupted
	 */
	MessageServer(const string &socketFilename, AccountsDatabasePtr accountsDatabase) {
		this->socketFilename   = socketFilename;
		this->accountsDatabase = accountsDatabase;
		loginTimeout = 2000000;
		startListening();
	}
	
	~MessageServer() {
		this_thread::disable_syscall_interruption dsi;
		syscalls::close(serverFd);
		syscalls::unlink(socketFilename.c_str());
	}
	
	string getSocketFilename() const {
		return socketFilename;
	}
	
	/**
	 * Starts the server main loop. This method will loop forever until some
	 * other thread interrupts the calling thread, or until an exception is raised.
	 *
	 * @throws SystemException Unable to accept a new connection. If this is a
	 *                         non-fatal error then you may call mainLoop() again
	 *                         to restart the server main loop.
	 * @throws boost::thread_interrupted The calling thread has been interrupted.
	 */
	void mainLoop() {
		TRACE_POINT();
		while (true) {
			this_thread::interruption_point();
			sockaddr_un addr;
			socklen_t len = sizeof(addr);
			FileDescriptor fd;
			
			UPDATE_TRACE_POINT();
			fd = syscalls::accept(serverFd, (struct sockaddr *) &addr, &len);
			if (fd == -1) {
				throw SystemException("Unable to accept a new client", errno);
			}
			
			UPDATE_TRACE_POINT();
			this_thread::disable_interruption di;
			this_thread::disable_syscall_interruption dsi;
			
			function<void ()> func(boost::bind(&MessageServer::clientHandlingMainLoop,
				this, fd));
			string name = "MessageServer client thread ";
			name.append(toString(fd));
			threadGroup.create_thread(func, name, CLIENT_THREAD_STACK_SIZE);
		}
	}
	
	/**
	 * Registers a new handler.
	 * 
	 * @pre The main loop isn't running.
	 */
	void addHandler(HandlerPtr handler) {
		handlers.push_back(handler);
	}
	
	/**
	 * Sets the maximum number of microseconds that clients may spend on logging in.
	 * Clients that take longer are disconnected.
	 *
	 * @pre timeout != 0
	 * @pre The main loop isn't running.
	 */
	void setLoginTimeout(unsigned long long timeout) {
		assert(timeout != 0);
		loginTimeout = timeout;
	}
};

typedef shared_ptr<MessageServer> MessageServerPtr;

} // namespace Passenger

#endif /* _PASSENGER_MESSAGE_SERVER_H_ */