File: Tasks.py

package info (click to toggle)
boa-constructor 0.3.0-3
  • links: PTS
  • area: main
  • in suites: sarge
  • size: 8,188 kB
  • ctags: 8,857
  • sloc: python: 54,163; sh: 66; makefile: 36
file content (114 lines) | stat: -rw-r--r-- 3,735 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import threading

PRINT_TRACEBACKS = 0

class ThreadedTaskHandler:
    '''Rather than creating a new thread for each task, reuses existing
    threads for speed.
    '''

    def __init__(self, target_threads=1, limit_threads=0):
        self.queue = []
        self.cond = threading.Condition()
        self.running_threads = 0
        self.idle_threads = 0
        self.target_threads = target_threads
        self.limit_threads = limit_threads

    def addTask(self, task, args=(), kw=None):
        '''
        task is a callable object which will be executed in another
        thread.
        '''
        if 0:  # Set to 1 to get equivalent but slower processing.
            if kw is None: kw = {}
            t = threading.Thread(target=task, args=args, kwargs=kw)
            t.setDaemon(1)
            t.start()
            return

        cond = self.cond
        cond.acquire()
        try:
            self.queue.append((task, args, kw))
            if self.idle_threads < 1:
                if self.limit_threads < 1 or (self.running_threads
                                              < self.limit_threads):
                    t = threading.Thread(target=self.clientThread)
                    t.setDaemon(1)
                    self.running_threads = self.running_threads + 1
                    t.start()
            else:
                self.idle_threads = self.idle_threads - 1
            cond.notify()
        finally:
            cond.release()

    def clientThread(self):
        '''
        Performs tasks.
        '''
        exit_loop = 0
        while not exit_loop:
            task = args = kw = None
            cond = self.cond
            cond.acquire()
            try:
                queue = self.queue
                if len(queue) < 1:
                    if self.running_threads > self.target_threads:
                        exit_loop = 1
                        self.running_threads = self.running_threads - 1
                    else:
                        self.idle_threads = self.idle_threads + 1
                        cond.wait()
                if len(queue) > 0:
                    task, args, kw = queue[0]
                    del queue[0]
            finally:
                cond.release()

            if task is not None:
                #print 'performing task: %s(%s, %s)'%(task, args, kw)
                try:
                    if kw is not None:
                        apply(task, args, kw)
                    else:
                        apply(task, args)
                except SystemExit:
                    exit_loop = 1
                    self.running_threads = self.running_threads - 1
                except:
                    if PRINT_TRACEBACKS:
                        # The task ought to do its own error handling,
                        # but sometimes it doesn't.
                        import traceback
                        traceback.print_exc()


if __name__ == '__main__':
    from time import time, sleep
    evt = threading.Event()
    tth = ThreadedTaskHandler()
    start = 0
    end = 0

    class SpeedTest:
        def __init__(self, count):
            self.count = count
        def __call__(self):
            global end
            self.count = self.count - 1
            if self.count <= 0:
                end = time()
                evt.set()
            else:
                tth.addTask(self)

    count = 1000
    t = SpeedTest(count)
    start = time()
    tth.addTask(t)
    evt.wait()
    print 'Performed %d tasks in %d ms' % (count,
                                           int((end - start) * 1000))