File: Runner.py

package info (click to toggle)
lysdr 1.0~git20141206+dfsg1-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid, stretch
  • size: 832 kB
  • ctags: 1,685
  • sloc: python: 11,659; ansic: 1,335; makefile: 24
file content (192 lines) | stat: -rw-r--r-- 4,323 bytes parent folder | download | duplicates (10)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#! /usr/bin/env python
# encoding: utf-8
# WARNING! All changes made to this file will be lost!

import os,sys,random,atexit
try:
	from queue import Queue
except:
	from Queue import Queue
from waflib import Utils,Logs,Task,Errors
GAP=10
class TaskConsumer(Utils.threading.Thread):
	def __init__(self):
		Utils.threading.Thread.__init__(self)
		self.ready=Queue()
		self.setDaemon(1)
		self.start()
	def run(self):
		try:
			self.loop()
		except:
			pass
	def loop(self):
		while 1:
			tsk=self.ready.get()
			if not isinstance(tsk,Task.TaskBase):
				tsk(self)
			else:
				tsk.process()
pool=Queue()
def get_pool():
	try:
		return pool.get(False)
	except:
		return TaskConsumer()
def put_pool(x):
	pool.put(x)
def _free_resources():
	global pool
	lst=[]
	while pool.qsize():
		lst.append(pool.get())
	for x in lst:
		x.ready.put(None)
	for x in lst:
		x.join()
	pool=None
atexit.register(_free_resources)
class Parallel(object):
	def __init__(self,bld,j=2):
		self.numjobs=j
		self.bld=bld
		self.outstanding=[]
		self.frozen=[]
		self.out=Queue(0)
		self.count=0
		self.processed=1
		self.stop=False
		self.error=[]
		self.biter=None
		self.dirty=False
	def get_next_task(self):
		if not self.outstanding:
			return None
		return self.outstanding.pop(0)
	def postpone(self,tsk):
		if random.randint(0,1):
			self.frozen.insert(0,tsk)
		else:
			self.frozen.append(tsk)
	def refill_task_list(self):
		while self.count>self.numjobs*GAP:
			self.get_out()
		while not self.outstanding:
			if self.count:
				self.get_out()
			elif self.frozen:
				try:
					cond=self.deadlock==self.processed
				except:
					pass
				else:
					if cond:
						msg='check the build order for the tasks'
						for tsk in self.frozen:
							if not tsk.run_after:
								msg='check the methods runnable_status'
								break
						lst=[]
						for tsk in self.frozen:
							lst.append('%s\t-> %r'%(repr(tsk),[id(x)for x in tsk.run_after]))
						raise Errors.WafError("Deadlock detected: %s%s"%(msg,''.join(lst)))
				self.deadlock=self.processed
			if self.frozen:
				self.outstanding+=self.frozen
				self.frozen=[]
			elif not self.count:
				self.outstanding.extend(self.biter.next())
				self.total=self.bld.total()
				break
	def add_more_tasks(self,tsk):
		if getattr(tsk,'more_tasks',None):
			self.outstanding+=tsk.more_tasks
			self.total+=len(tsk.more_tasks)
	def get_out(self):
		tsk=self.out.get()
		if not self.stop:
			self.add_more_tasks(tsk)
		self.count-=1
		self.dirty=True
	def error_handler(self,tsk):
		if not self.bld.keep:
			self.stop=True
		self.error.append(tsk)
	def add_task(self,tsk):
		try:
			pool=self.pool
		except AttributeError:
			pool=self.init_task_pool()
		self.ready.put(tsk)
	def init_task_pool(self):
		pool=self.pool=[get_pool()for i in range(self.numjobs)]
		self.ready=Queue(0)
		def setq(consumer):
			consumer.ready=self.ready
		for x in pool:
			x.ready.put(setq)
		return pool
	def free_task_pool(self):
		def setq(consumer):
			consumer.ready=Queue(0)
			self.out.put(self)
		try:
			pool=self.pool
		except:
			pass
		else:
			for x in pool:
				self.ready.put(setq)
			for x in pool:
				self.get_out()
			for x in pool:
				put_pool(x)
			self.pool=[]
	def start(self):
		self.total=self.bld.total()
		while not self.stop:
			self.refill_task_list()
			tsk=self.get_next_task()
			if not tsk:
				if self.count:
					continue
				else:
					break
			if tsk.hasrun:
				self.processed+=1
				continue
			try:
				st=tsk.runnable_status()
			except Exception ,e:
				self.processed+=1
				if self.stop and not self.bld.keep:
					tsk.hasrun=Task.SKIPPED
					continue
				tsk.err_msg=Utils.ex_stack()
				tsk.hasrun=Task.EXCEPTION
				self.error_handler(tsk)
				continue
			if st==Task.ASK_LATER:
				self.postpone(tsk)
				if self.outstanding:
					for x in tsk.run_after:
						if x in self.outstanding:
							self.outstanding.remove(x)
							self.outstanding.insert(0,x)
			elif st==Task.SKIP_ME:
				self.processed+=1
				tsk.hasrun=Task.SKIPPED
				self.add_more_tasks(tsk)
			else:
				tsk.position=(self.processed,self.total)
				self.count+=1
				tsk.master=self
				self.processed+=1
				if self.numjobs==1:
					tsk.process()
				else:
					self.add_task(tsk)
		while self.error and self.count:
			self.get_out()
		assert(self.count==0 or self.stop)
		self.free_task_pool()