1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
#! /usr/bin/env python
# encoding: utf-8
import sys
if sys.hexversion < 0x020400f0: from sets import Set as set
import sys,random,time,threading,Queue,traceback
import Build,Utils,Logs,Options
import pproc
from Logs import debug,error
from Constants import*
GAP=15
run_old=threading.Thread.run
def run(*args,**kwargs):
try:
run_old(*args,**kwargs)
except(KeyboardInterrupt,SystemExit):
raise
except:
sys.excepthook(*sys.exc_info())
threading.Thread.run=run
class TaskConsumer(threading.Thread):
def __init__(self,m):
threading.Thread.__init__(self)
self.setDaemon(1)
self.master=m
self.start()
def run(self):
try:
self.loop()
except:
pass
def loop(self):
m=self.master
while 1:
tsk=m.ready.get()
if m.stop:
m.out.put(tsk)
continue
try:
tsk.generator.bld.printout(tsk.display())
if tsk.__class__.stat:ret=tsk.__class__.stat(tsk)
else:ret=tsk.call_run()
except Exception,e:
tsk.err_msg=Utils.ex_stack()
tsk.hasrun=EXCEPTION
m.error_handler(tsk)
m.out.put(tsk)
continue
if ret:
tsk.err_code=ret
tsk.hasrun=CRASHED
else:
try:
tsk.post_run()
except OSError:
tsk.hasrun=MISSING
except:
tsk.err_msg=Utils.ex_stack()
tsk.hasrun=EXCEPTION
else:
tsk.hasrun=SUCCESS
if tsk.hasrun!=SUCCESS:
m.error_handler(tsk)
m.out.put(tsk)
class Parallel(object):
def __init__(self,bld,j=2):
self.numjobs=j
self.manager=bld.task_manager
self.total=self.manager.total()
self.outstanding=[]
self.maxjobs=sys.maxint
self.frozen=[]
self.ready=Queue.Queue(0)
self.out=Queue.Queue(0)
self.count=0
self.stuck=0
self.processed=1
self.consumers=None
self.stop=False
self.error=False
def get_next(self):
if not self.outstanding:
return None
return self.outstanding.pop(0)
def postpone(self,tsk):
if random.randint(0,1):
self.frozen.insert(0,tsk)
else:
self.frozen.append(tsk)
def refill_task_list(self):
while self.count>self.numjobs+GAP or self.count>self.maxjobs:
self.get_out()
while not self.outstanding:
if self.count:
self.get_out()
if self.frozen:
self.outstanding+=self.frozen
self.frozen=[]
elif not self.count:
(self.maxjobs,tmp)=self.manager.get_next_set()
if tmp:self.outstanding+=tmp
break
def get_out(self):
ret=self.out.get()
self.manager.add_finished(ret)
if not self.stop and getattr(ret,'more_tasks',None):
self.outstanding+=ret.more_tasks
self.total+=len(ret.more_tasks)
self.count-=1
def error_handler(self,tsk):
if not Options.options.keep:
self.stop=True
self.error=True
def start(self):
while not self.stop:
self.refill_task_list()
tsk=self.get_next()
if not tsk:
if self.count:
continue
else:
break
if tsk.hasrun:
self.processed+=1
self.manager.add_finished(tsk)
try:
st=tsk.runnable_status()
except Exception,e:
tsk.err_msg=Utils.ex_stack()
tsk.hasrun=EXCEPTION
self.processed+=1
self.error_handler(tsk)
self.manager.add_finished(tsk)
continue
if st==ASK_LATER:
self.postpone(tsk)
elif st==SKIP_ME:
self.processed+=1
tsk.hasrun=SKIPPED
self.manager.add_finished(tsk)
else:
tsk.position=(self.processed,self.total)
self.count+=1
self.ready.put(tsk)
self.processed+=1
if not self.consumers:
self.consumers=[TaskConsumer(self)for i in xrange(self.numjobs)]
while self.error and self.count:
self.get_out()
assert(self.count==0 or self.stop)
|