File: bzfquery.py

package info (click to toggle)
bzflag 2.0.8.20060605
  • links: PTS
  • area: main
  • in suites: etch, etch-m68k
  • size: 29,168 kB
  • ctags: 33,169
  • sloc: cpp: 132,719; ansic: 14,122; sh: 13,441; makefile: 2,330; php: 428; python: 334; perl: 287; objc: 243; xml: 180
file content (359 lines) | stat: -rwxr-xr-x 9,240 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Inspired from misc/bzfquery.pl
#
# Author: Frédéric Jolliton [aka FredCods]
#         <fj@tuxee.net>
#         <fred@jolliton.com>
#
# This script can be used either as a module,
# a CGI or directly from the command line.
#
# Example of use:
#
#   s = Server( 'bzflag3.tuxee.net' , 45154 )
#   game = s.queryGame()
#   print game[ 'style' ]
#   teams , players = s.queryPlayers()
#   print teams.get( 'rogue' )
#   print players[ 0 ]
#
# output the following:
#
# ['flags', 'jumping', 'ricochet', 'shaking']
# {'score': 0, 'won': 0, 'lost': 0, 'size': 1}
# {'lost': 0, 'pId': 0, 'sign': 'FredCods', 'won': 0, 'tks': 0, 'team': 'observer', 'type': 0, 'email': ''}
#
#

import sys
import cgi
import os
import struct
import socket
import time
import select

#
# If true, display detailled HTML output when exception occur
#
enableCgiTb = True

#
# If true, then allow ?host=<hostname>&port=<port> on URL
# instead of using only defaultHostname and defaultPort.
#
allowCgiParameters = True

#
# IMPORTANT: Change value here for CGI
#
defaultHostname = 'localhost'
defaultPort = 5154

#
# Default timeout is seconds.
#
# Set to None for no timeout (not a good idea)
#
defaultTimeout = 10.

#
# Throw an exception if timeout occur.
#
hardTimeout = True

class Error( Exception ) : pass

def s2n( s ) :

	return reduce( lambda a , b : 256 * a + ord( b ) , s , 0 )

styles = [
	( 'CTF'         , 0x0001 ) ,
	( 'flags'       , 0x0002 ) ,
	( 'jumping'     , 0x0008 ) ,
	( 'inertia'     , 0x0010 ) ,
	( 'ricochet'    , 0x0020 ) ,
	( 'shaking'     , 0x0040 ) ,
	( 'antidote'    , 0x0080 ) ,
	( 'handicap'    , 0x0100 ) ,
	( 'rabbit-hunt' , 0x0200 )
]

teamsName = [
	'rogue' ,
	'red' ,
	'green' ,
	'blue' ,
	'purple' ,
	'observer' ,
	'rabbit'
]

playerType = [
	'tank' ,
	'observer' ,
	'robot tank'
]

def decodeStyle( n ) :

	flags = []
	for style , bit in styles :
		if n & bit :
			flags.append( style )
	return flags

def receive( sock , size , timeout = None ) :

	'''Receive up to 'size' byte from socket 'sock'. If timeout is not
	None, wait up to 'timeout' seconds for some answer, returning
	None if nothing was available in the meantime or throwing an
	exception according to hardTimeout global setting.'''

	if timeout is not None :
		#
		# First wait that something is available on socket 'sock'
		#
		timeLimit = time.time() + timeout
		while 1 :
			t = timeLimit - time.time()
			if t <= 0 :
				if hardTimeout :
					raise Error( 'Timeout' )
				else :
					return
			r , w , x = select.select( [ sock ] , [] , [] , t )
			if sock in r :
				break
	return sock.recv( size )

class Server :

	def __init__( self , host = '127.0.0.1' , port = 5154 ) :

		self.sock = socket.socket( socket.AF_INET , socket.SOCK_STREAM )
		self.sock.connect( ( host , port ) )
		header = receive( self.sock , 9 , defaultTimeout )
		magic , self.protocol , self.id = \
			struct.unpack( '4s4sb' , header )
		if magic != 'BZFS' :
			raise Error( 'Not a bzflag server.' )
		if self.protocol not in [ '0026' ] :
			raise Error( 'Not compatible with server.' )

	def cmd( self , command ) :

		if len( command ) != 2 :
			raise Error( 'Command must be 2 characters long.' )
		self.sock.sendall( struct.pack( '>2H' , 0 , s2n( command ) ) )
		return self.getResponse( command )

	def _getPacket( self ) :

		size , code = struct.unpack( '>H2s' , receive( self.sock , 4 , defaultTimeout ) )
		data = receive( self.sock , size , defaultTimeout )
		return code , data

	#
	# If expectedCode is none, we return the first packet that is
	# not a msgGameTime packet.
	#
	# If expectedCode is not none, we return the first packet that
	# match this code, discarding all other packets.
	#
	def getResponse( self , expectedCode ) :

		timeLimit = time.time() + defaultTimeout
		code = None
		while time.time() < timeLimit :
			code , data = self._getPacket()
			if code == expectedCode :
				break
		else :
			if code is not None :
				raise Error( 'Got wrong response code (got %r, expected %r)' \
					     % ( code , expectedCode ) )
			else :
				raise Error( 'No answer' )
		return data

	def queryGame( self ) :

		data = self.cmd( 'qg' )
		data = struct.unpack( '>21H' , data )
		style , maxPlayers , maxShots , rogueSize , \
			redSize , greenSize , blueSize , purpleSize , obsSize, \
			rogueMax , redMax , greenMax , blueMax , purpleMax , obsMax, \
			shakeWins , shakeTimeout , maxPlayerScore , maxTeamScore , \
			maxTime , elapsedTime \
			= data
		style = decodeStyle( style )
		teams = {
			'rogue'    : ( rogueSize  , rogueMax ) ,
			'red'      : ( redSize    , redMax ) ,
			'green'    : ( greenSize  , greenMax ) ,
			'blue'     : ( blueSize   , blueMax ) ,
			'purple'   : ( purpleSize , purpleMax ) ,
			'observer' : ( obsSize    , obsMax ) ,
		}
		infos = {
			'style' : style ,
			'teams' : teams ,
			'maxPlayerScore' : maxPlayerScore ,
			'maxTeamScore' : maxTeamScore ,
			'maxPlayers' : maxPlayers ,
			'maxShots' : maxShots ,
			'maxTime' : maxTime / 10 ,
			'elapsedTime' : elapsedTime / 10 ,
		}
		if 'shaking' in style :
			infos[ 'shake' ] = { 'wins' : shakeWins , 'timeout' : shakeTimeout / 10. }
		return infos

	def queryPlayers( self ) :

		data = self.cmd( 'qp' )
		data = struct.unpack( '>2H' , data )
		numTeams_ , numPlayers = data
		data = self.getResponse( 'tu' )
		numTeams , data = ord( data[ 0 ] ) , data[ 1 : ]
		#if numTeams != numTeams_ :
		#	raise Error( 'Inconsistency in numTeams (got %d and %d)' \
		#		% ( numTeams_ , numTeams ) )
		teamsInfo = {}
		for i in range( numTeams ) :
			teamInfo , data = data[ : 8 ] , data[ 8 : ]
			team , size , won , lost = struct.unpack( '>4H' , teamInfo )
			score = won - lost
			teamsInfo[ teamsName[ team ] ] = {
				'size'  : size ,
				'score' : score ,
				'won'   : won ,
				'lost'  : lost
			}
		playersInfo = []
		for i in range( numPlayers ) :
			data = self.getResponse( 'ap'  )
			pId , type , team , won , lost , tks , sign , email = \
				struct.unpack( '>b5H32s128s' , data )
			playerInfo = {
				'pId'   : pId ,
				'type'  : type ,
				'team'  : teamsName[ team ] ,
				'score' : won - lost ,
				'won'   : won ,
				'lost'  : lost ,
				'tks'   : tks ,
				'sign'  : sign.rstrip( '\x00' ) ,
				'email' : email.rstrip( '\x00' )
			}
			playersInfo.append( playerInfo )
		return teamsInfo , playersInfo

def getAndPrintStat( hostname , port ) :

	s = Server( hostname , port )
	game = s.queryGame()

	if os.environ.has_key( 'QUERY_STRING' ) :
		print 'Content-Type: text/plain\n'
	print 'Statistics of the BZFlag server %s (port %s)' % ( hostname , port )
	print
	print '--[ GAME ]' + '-' * 40
	print
	print 'Style:' , ' '.join( game[ 'style' ] )
	print
	print 'Max players: %s   Max shots: %s' % ( game[ 'maxPlayers' ] , game[ 'maxShots' ] )
	print
	print 'Teams     Size   Max'
	print '-' * 20
	for team in teamsName :
		t = game[ 'teams' ].get( team )
		if t is not None :
			print '%-8s %5d %5d' % ( team , t[ 0 ] , t[ 1 ] )
	shaking = game.get( 'shake' )
	if shaking :
		print
		print 'Shaking bad flag: wins: %d, timeout: %g' % ( shaking[ 'wins' ] , shaking[ 'timeout' ] )
	print
	print 'Max player score: %d' % game[ 'maxPlayerScore' ]
	print 'Max team score: %d' % game[ 'maxTeamScore' ]
	print 'Max time: %g' % game[ 'maxTime' ]
	print 'Time elapsed: %g' % game[ 'elapsedTime' ]

	teams , players = s.queryPlayers()
	print
	print '--[ TEAMS ]' + '-' * 39
	print
	print 'Teams     Size  Score  Won  Lost'
	print '-' * 32
	for team in teamsName :
		t = teams.get( team )
		if t is not None :
			print '%-8s %5d %5d %5d %5d' \
				% ( team , t[ 'size' ] , t[ 'score' ] , t[ 'won' ] , t[ 'lost' ] )

	print
	print '--[ PLAYERS ]' + '-' * 37
	print
	print 'Team     Score   Won  Lost Type       Sign'
	print '-' * 60
	players.sort( lambda a , b : cmp( b[ 'score' ] , a[ 'score' ] ) )
	for player in players :
		sign , team , score , won , lost , email = \
			player[ 'sign' ] , player[ 'team' ] , \
			player[ 'score' ] , player[ 'won' ] , player[ 'lost' ] , \
			player[ 'email' ]
		try :
			type = playerType[ player[ 'type' ] ]
		except :
			type = 'Unknown player type %s' % player.get( 'tks' )
		name = sign
		if email : name = name + ' <%s>' % email
		print '%-8s %5d %5d %5d %-10s %s' % ( team , score , won , lost , type , name )

def usage() :

	print '''Usage: bzfquery.py [OPTIONS] [hostname [port]]

 -h, --help  Display this help.

Report bugs to <bzflag@tuxee.net>.'''

def main() :

	hostname , port = defaultHostname , defaultPort
	if os.environ.has_key( 'QUERY_STRING' ) :
		if enableCgiTb : import cgitb; cgitb.enable()

		if allowCgiParameters :
			form = cgi.FormContentDict()
			hostname = form.get( 'host' , [ defaultHostname ] )[ 0 ]
			try :
				port = int( form.get( 'port' , [ defaultPort ] )[ 0 ] )
			except :
				pass
	else :
		import getopt
		options , parameters = getopt.getopt( sys.argv[ 1 : ] , 'h' , ( 'help' , ) )

		for option , argument in options :
			if option in [ '-h' , '--help' ] :
				usage()
				sys.exit( 0 )

		if len( parameters ) > 2 :
			usage()
			sys.exit( 0 )
		if 1 <= len( parameters ) <= 2 :
			hostname = parameters[ 0 ]
		if len( parameters ) == 2 :
			port = int( parameters[ 1 ] )

	getAndPrintStat( hostname , port )

if __name__ == '__main__' :
	main()