File: commanddispatcher.cpp

package info (click to toggle)
dc-qt 0.2.0.alpha-4
  • links: PTS
  • area: main
  • in suites: lenny
  • size: 1,948 kB
  • ctags: 5,361
  • sloc: cpp: 28,936; makefile: 19
file content (266 lines) | stat: -rw-r--r-- 7,006 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
/*
 *  CommandDispatcher.cpp
 *  RpcDriver
 *
 *  Created by Mikael Gransell on 1/31/06.
 *  Copyright 2006 __MyCompanyName__. All rights reserved.
 *
 */

#include <iostream>
using namespace std;

#include "commanddispatcher.h"
#include "rpcexception.h"
#include "types.h"

namespace rpc {

void CommandDispatcher::setClientAuthenticated(int clientId,bool authenticated)
{
	if(authenticated) authenticatedClients.insert(clientId);
	else authenticatedClients.erase(clientId);
}

bool CommandDispatcher::isClientAuthenticated(int clientId)
{
	if(!requireAuthentication) return true;
	return authenticatedClients.find(clientId)!=authenticatedClients.end();
}

void rpc::CommandDispatcher::registerCommand( RpcCommandHandlerPtr cmd )
{
	// Lock the mutex
	boost::mutex::scoped_lock lock( handlerListMutex );
	
	// Add the item. If the item does not exist a new list will be created
	// and the item added to it.
	registeredCommands[cmd->getCmdName()].push_back( cmd );
}

void rpc::CommandDispatcher::handleCommand( int sender, CmdInputBufferPtr cmd )
{
	try {
		
		// Create a DataInputStream for easy reading
		DataInputStreamPtr cmdStream( new DataInputStream( cmd ) );
		
		// Will hold the command
		list<boost::any> params;
		
		// The the id of the sender
		params.push_back( sender );
		// Get name of the command. 
		// This should allways be the first param in the command stream
		string cmdName = retrieveCommandName( cmdStream );
		//cout << "handleCommand: " << cmdName << endl;

		// Add the name as the first param in the list
		params.push_back( cmdName );
		// Parse the params of the command. 
		// retriveCommandName should have advanced the buffer to the first param
		parseCommandParams( cmdStream, params );
		
		// Add the command to the buffer
		commandBuffer.enqueue( params );
		
		// Signal waiting thread
		commandAvailable.notify_one();
	}
	catch( const RpcException& rpcExcept ) {
		
		cout << "Could not handle command. Error: " << rpcExcept.getReason() << endl;
	}
}

void rpc::CommandDispatcher::waitForCommand()
{
	try {
		
		// To monitor the addition of commands in the incomming buffer
		boost::mutex::scoped_lock lock( commandMonitor );
		
		bool stop = false;
		while( !stop ) {
			
			// Wait for something to get placed in the queue
			commandAvailable.wait( lock );
			
			// Handle all the commands in the list
			while( !commandBuffer.empty() ) {
								
				try {
					
					// Get first command
					list<boost::any> cmdParams = commandBuffer.dequeue();
					
					// Second should be the id of the client that sent this command
					int sender = boost::any_cast<int>(cmdParams.front());
					cmdParams.pop_front();
					// First element should be the command name
					string cmdName = boost::any_cast<string>(cmdParams.front());
					cmdParams.pop_front();
					if(!requireAuthentication || isClientAuthenticated(sender) || cmdName=="authenticate" )
						dispatchCommand( cmdName, sender, cmdParams );
				}
				catch( const boost::bad_any_cast& castExcept ) {
					
					cout << "Could not cast first parameter to command name. Error: " <<
							castExcept.what() << endl;
				}
				catch( const std::out_of_range& rangeExcept ) {
					
					cout << "Could not get command. Error: " << rangeExcept.what() << endl;
				}
				catch( const RpcException& except ) {
					
					cout << "Could not dispatch command. Error: " << except.getReason() << endl;
				}
			}
		}
	}
	catch( std::bad_alloc& allocExcept ) {
		
		cout << "Could not allocate memory for buffer. Error: " << allocExcept.what() << endl;
	}
	catch( const boost::lock_error& lkExcept ) {
		
		cout << "Could not lock command monitor mutex. Error: " << lkExcept.what() << endl;
	}
	catch(...) {
		// Make sure we catch everything so that we dont fuck up the entire app
		cout << "Unknown exception about to leave thread." << endl;
	}
}

boost::any CommandDispatcher::getNextParam(DataInputStreamPtr paramStream, 
										   list<boost::any>& params) 
{
	// Hold the next argument
	boost::any nextParam;
	int size;
	list<boost::any> listElements;
	// Read the first byte containing the type of the next param
	char type = paramStream->readByte();
	
	switch( type ) {
		
		case eRpcParamTypeInt:
			nextParam = paramStream->readInt();
			break;
			
		case eRpcParamTypeByte:
			nextParam = paramStream->readByte();
			break;
			
		case eRpcParamTypeShort:
			nextParam = paramStream->readShort();
			break;
			
		case eRpcParamTypeBoolean:
			nextParam = paramStream->readBool();
			break;
			
		case eRpcParamTypeLong:
			nextParam = paramStream->readLong();
			break;
			
		case eRpcParamTypeString:
			nextParam = paramStream->readUTF();
			break;
		case eRpcParamTypeList:
			size = paramStream->readInt();
			
			while(size--) listElements.push_back(getNextParam(paramStream,listElements));
				nextParam = listElements;
			break;
		default:
			// Could not match parameter type so we bail
			throw RpcException("Invalid parameter type");
	}
	return nextParam;
}

void CommandDispatcher::parseCommandParams( DataInputStreamPtr paramStream, 
											list<boost::any>& params )
{
	try {
		
		// Keep reading data as long as we have something more that the size byte
		while( paramStream->remainingBytes() > 1 ) {
			boost::any nextParam = getNextParam(paramStream,params);
						
			// Add param to our list
			params.push_back( nextParam );
		}	
	}
	catch( const RpcException& rpcExcept ) {
		
		cout << "Could not read data. Error: " << rpcExcept.getReason() << endl;
		throw;
	}
}

string CommandDispatcher::retrieveCommandName( DataInputStreamPtr cmdStream )
{
	string cmdName = "";
	
	// Read a string from the stream
	try {
		cmdName = cmdStream->readUTF();
	}
	catch( const RpcException& except ) {
		
		cout << "Could not read command name from stream" << endl;
		throw;
	}
	
	return cmdName;
}

void CommandDispatcher::dispatchCommand( const string& cmdName,
										 int sender,
										 const list<boost::any>& params )
{
	try {
		// Lock access to the list
		boost::mutex::scoped_lock lock( handlerListMutex );
		
		// Find the list of command handlers that correspond to this command
		map<string, RpcCommandHandlerList>::iterator it = registeredCommands.find( cmdName );
		
		if( it != registeredCommands.end() ) {
			
			// Iterate the list of commands and notify them of the command
			RpcCommandHandlerList::iterator cmdIt = it->second.begin();
			while( cmdIt != it->second.end() ) {
				
				// We need at least numParams amount of params so that 
				// we dont index outside the list
				if( (*cmdIt)->getNumParams() <= params.size() ) {
					// Notify
					(*cmdIt)->handleCommand( sender, params );
				}
				else {
				  cout << "Invalid number of params: " << cmdName << endl;
				}
				
				// Next command
				++cmdIt;
			}
		}
		else {
			
			// Maybe log something here.
			cout << "Could not find any handlers for command: " << 
					cmdName << 
					endl;
		}
	}
	catch( const boost::lock_error& e ) {
		
		cout << "Could not lock mutex" << endl;
	}
}

}