1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
import SocketServer, os, datetime, sys, random, time
import simplejson
class LQSCommand:
def __init__(self, line):
self.raw_line = line
self.line = self.raw_line.strip()
l = self.line.split(' ')
self.name = l[0]
if len(l) > 1:
self.args = [arg for arg in l[1:] if arg]
else:
self.args = []
class LQSMessage(dict):
def __init__(self, item=None, args=None, jsonvalue=None):
dict.__init__(self)
if jsonvalue:
self.decode(jsonvalue)
else:
self['id'] = '%d_%d' % (int(time.time()), int(random.random()*1000000))
self['item'] = item
self['args'] = args
def encode(self):
return simplejson.dumps(self)
def decode(self, value):
self.update(simplejson.loads(value))
def is_empty(self):
if self['item'] == None:
return True
return False
class LQSServer(SocketServer.UDPServer):
PORT = 5151
TIMEOUT = 30
MAXSIZE = 8192
def __init__(self, server_address, RequestHandlerClass, iterator, args=None):
server_address = (server_address, self.PORT)
SocketServer.UDPServer.__init__(self, server_address, RequestHandlerClass)
self.count = 0
self.iterator = iterator
self.args = args
self.start = datetime.datetime.now()
self.end = None
self.extant = []
class LQSHandler(SocketServer.DatagramRequestHandler):
def get_cmd(self):
return LQSCommand(self.rfile.readline())
def build_msg(self):
if not self.server.iterator:
return LQSMessage(None)
try:
item = self.server.iterator.next()
msg = LQSMessage(item, self.server.args)
return msg
except StopIteration:
self.server.iterator = None
return LQSMessage(None)
def respond(self, msg):
self.wfile.write(msg.encode())
def check_extant(self):
if len(self.server.extant) == 0 and not self.server.iterator:
self.server.end = datetime.datetime.now()
delta = self.server.end - self.server.start
print 'Total Processing Time: %s' % delta
print 'Total Messages Processed: %d' % self.server.count
def do_debug(self, cmd):
args = {'extant' : self.server.extant,
'count' : self.server.count}
msg = LQSMessage('debug', args)
self.respond(msg)
def do_next(self, cmd):
out_msg = self.build_msg()
if not out_msg.is_empty():
self.server.count += 1
self.server.extant.append(out_msg['id'])
self.respond(out_msg)
def do_delete(self, cmd):
if len(cmd.args) != 1:
self.error(cmd, 'delete command requires message id')
else:
mid = cmd.args[0]
try:
self.server.extant.remove(mid)
except ValueError:
self.error(cmd, 'message id not found')
args = {'deleted' : True}
msg = LQSMessage(mid, args)
self.respond(msg)
self.check_extant()
def error(self, cmd, error_msg=None):
args = {'error_msg' : error_msg,
'cmd_name' : cmd.name,
'cmd_args' : cmd.args}
msg = LQSMessage('error', args)
self.respond(msg)
def do_stop(self, cmd):
sys.exit(0)
def handle(self):
cmd = self.get_cmd()
if hasattr(self, 'do_%s' % cmd.name):
method = getattr(self, 'do_%s' % cmd.name)
method(cmd)
else:
self.error(cmd, 'unrecognized command')
class PersistHandler(LQSHandler):
def build_msg(self):
if not self.server.iterator:
return LQSMessage(None)
try:
obj = self.server.iterator.next()
msg = LQSMessage(obj.id, self.server.args)
return msg
except StopIteration:
self.server.iterator = None
return LQSMessage(None)
def test_file(path, args=None):
l = os.listdir(path)
if not args:
args = {}
args['path'] = path
s = LQSServer('', LQSHandler, iter(l), args)
print "Awaiting UDP messages on port %d" % s.PORT
s.serve_forever()
def test_simple(n):
l = range(0, n)
s = LQSServer('', LQSHandler, iter(l), None)
print "Awaiting UDP messages on port %d" % s.PORT
s.serve_forever()
|