File: lqs.py

package info (click to toggle)
python-boto 1.9b-4
  • links: PTS, VCS
  • area: main
  • in suites: squeeze
  • size: 1,820 kB
  • ctags: 2,583
  • sloc: python: 16,337; makefile: 106
file content (152 lines) | stat: -rw-r--r-- 4,475 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import SocketServer, os, datetime, sys, random, time
import simplejson

class LQSCommand:

    def __init__(self, line):
        self.raw_line = line
        self.line = self.raw_line.strip()
        l = self.line.split(' ')
        self.name = l[0]
        if len(l) > 1:
            self.args = [arg for arg in l[1:] if arg]
        else:
            self.args = []

class LQSMessage(dict):

    def __init__(self, item=None, args=None, jsonvalue=None):
        dict.__init__(self)
        if jsonvalue:
            self.decode(jsonvalue)
        else:
            self['id'] = '%d_%d' % (int(time.time()), int(random.random()*1000000))
            self['item'] = item
            self['args'] = args

    def encode(self):
        return simplejson.dumps(self)

    def decode(self, value):
        self.update(simplejson.loads(value))

    def is_empty(self):
        if self['item'] == None:
            return True
        return False

class LQSServer(SocketServer.UDPServer):

    PORT = 5151
    TIMEOUT = 30
    MAXSIZE = 8192

    def __init__(self, server_address, RequestHandlerClass, iterator, args=None):
        server_address = (server_address, self.PORT)
        SocketServer.UDPServer.__init__(self, server_address, RequestHandlerClass)
        self.count = 0
        self.iterator = iterator
        self.args = args
        self.start = datetime.datetime.now()
        self.end = None
        self.extant = []

class LQSHandler(SocketServer.DatagramRequestHandler):

    def get_cmd(self):
        return LQSCommand(self.rfile.readline())

    def build_msg(self):
        if not self.server.iterator:
            return LQSMessage(None)
        try:
            item = self.server.iterator.next()
            msg = LQSMessage(item, self.server.args)
            return msg
        except StopIteration:
            self.server.iterator = None
            return LQSMessage(None)

    def respond(self, msg):
        self.wfile.write(msg.encode())

    def check_extant(self):
        if len(self.server.extant) == 0 and not self.server.iterator:
            self.server.end = datetime.datetime.now()
            delta = self.server.end - self.server.start
            print 'Total Processing Time: %s' % delta
            print 'Total Messages Processed: %d' % self.server.count

    def do_debug(self, cmd):
        args = {'extant' : self.server.extant,
                'count' : self.server.count}
        msg = LQSMessage('debug', args)
        self.respond(msg)

    def do_next(self, cmd):
        out_msg = self.build_msg()
        if not out_msg.is_empty():
            self.server.count += 1
            self.server.extant.append(out_msg['id'])
        self.respond(out_msg)

    def do_delete(self, cmd):
        if len(cmd.args) != 1:
            self.error(cmd, 'delete command requires message id')
        else:
            mid = cmd.args[0]
            try:
                self.server.extant.remove(mid)
            except ValueError:
                self.error(cmd, 'message id not found')
            args = {'deleted' : True}
            msg = LQSMessage(mid, args)
            self.respond(msg)
            self.check_extant()

    def error(self, cmd, error_msg=None):
        args = {'error_msg' : error_msg,
                'cmd_name' : cmd.name,
                'cmd_args' : cmd.args}
        msg = LQSMessage('error', args)
        self.respond(msg)

    def do_stop(self, cmd):
        sys.exit(0)

    def handle(self):
        cmd = self.get_cmd()
        if hasattr(self, 'do_%s' % cmd.name):
            method = getattr(self, 'do_%s' % cmd.name)
            method(cmd)
        else:
            self.error(cmd, 'unrecognized command')

class PersistHandler(LQSHandler):

    def build_msg(self):
        if not self.server.iterator:
            return LQSMessage(None)
        try:
            obj = self.server.iterator.next()
            msg = LQSMessage(obj.id, self.server.args)
            return msg
        except StopIteration:
            self.server.iterator = None
            return LQSMessage(None)

def test_file(path, args=None):
    l = os.listdir(path)
    if not args:
        args = {}
    args['path'] = path
    s = LQSServer('', LQSHandler, iter(l), args)
    print "Awaiting UDP messages on port %d" % s.PORT
    s.serve_forever()

def test_simple(n):
    l = range(0, n)
    s = LQSServer('', LQSHandler, iter(l), None)
    print "Awaiting UDP messages on port %d" % s.PORT
    s.serve_forever()