File: uws.py

package info (click to toggle)
gavodachs 2.3%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 7,260 kB
  • sloc: python: 58,359; xml: 8,882; javascript: 3,453; ansic: 661; sh: 158; makefile: 22
file content (1521 lines) | stat: -rw-r--r-- 49,522 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
"""
Support classes for the universal worker service.
"""

#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import contextlib
import datetime
import os
import pickle 
import shutil
import signal
import tempfile
import threading
import weakref

from twisted.internet import protocol
from twisted.internet import reactor

from gavo import base
from gavo import rsc
from gavo import rscdef
from gavo import svcs
from gavo import utils
from gavo.base import cron
from gavo.protocols import dali

# Ward against typos
from gavo.votable.tapquery import ( #noflake: exported names
	PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, UNKNOWN)

END_STATES = set([COMPLETED, ERROR, ABORTED])

# used in the computation of quote
EST_TIME_PER_JOB = datetime.timedelta(minutes=10)

_DEFAULT_LOCK_TIMEOUT = 0.1


__docformat__ = "restructuredtext en"


def strOrEmpty(aVal):
	"""returns a stringified version of aVal, except an empty string is returned
	when aVal is None.
	"""
	if aVal is None:
		return ""
	else:
		return str(aVal)


class UWSError(base.Error):
	"""UWS-related errors, mainly to communicate with web renderers.

	UWSErrors are constructed with a displayable message (may be None to
	autogenerate one), a jobId (give one; the default None is only there
	to avoid breaking legacy code) and optionally a source exception and a hint.
	"""
	def __init__(self, msg, jobId=None, sourceEx=None, hint=None):
		base.Error.__init__(self, msg, hint=hint)
		self.msg = msg
		self.jobId = jobId
		self.sourceEx = sourceEx
		self.args = [self.msg, self.jobId, self.sourceEx, self.hint]
	
	def __str__(self):
		if self.msg:
			return self.msg
		elif self.sourceEx:
			return "UWS on job %s operation failed (%s, %s)"%(
				self.jobId,
				self.sourceEx.__class__.__name__,
				str(self.sourceEx))
		else:
			return "Unspecified UWS related error (id: %s)"%self.jobId


class JobNotFound(base.NotFoundError, UWSError):
	def __init__(self, jobId):
		base.NotFoundError.__init__(self, str(jobId), "UWS job", "jobs table")
	
	def __str__(self):
		return base.NotFoundError.__str__(self)


class UWS(object):
	"""a facade for a universal worker service (UWS).

	You must construct it with the job class (see UWSJob) and a 
	uwsactions.JobActions instance

	The UWS then provides methods to access the jobs table,
	create jobs and and deserialize jobs from the jobs table.

	We have a "statements cache" in here, where we used the UWS table
	defintion to create query strings we can later pass to the database.
	Don't worry about this any more.  Just write text queries when adding
	features.  It's more readable and just about as stable against
	code evolution.

	You must override the getURLForId(jobId) method in your concrete
	implementation.

	You should also override jobdocPreamble and joblistPreamble.  This
	is raw XML that is prepended to job and list documents.  This is primarily
	for PIs giving stylesheets, but given we don't use doctypes you could
	provide internal subsets there, too.  Anyway, see the TAP UWS runner 
	for examples.
	"""
	# how often should we check for jobs that wait for destruction?
	cleanupInterval = 3600*12

	# raw XML to prepend to joblist documents
	joblistPreamble = ""
	# raw XML to prepend to job documents
	jobdocPreamble = ""

	def __init__(self, jobClass, jobActions):
		self.jobClass = jobClass
		self.jobActions = jobActions
		self._statementsCache = None
		cron.runEvery(-self.cleanupInterval, 
			"UWS %s jobs table reaper"%str(self),
			self.cleanupJobsTable)
	
	def _makeMoreStatements(self, statements, jobsTable):
		"""adds custom statements to the canned query dict in derived
		classes.
		"""
		pass

	def _makeStatementsCache(self):
		"""returns a dictionary containing canned queries to manipulate
		the jobs table for this UWS.
		"""
		res = {}
		td = self.jobClass.jobsTD
		with base.getTableConn() as conn:
			jobsTable = rsc.TableForDef(td, connection=conn, exclusive=True)
			res["getByIdEx"] = jobsTable.getQuery(td, "jobId=%(jobId)s", {"jobId": 0})
			res["feedToIdEx"] = None, jobsTable.addCommand, None
			res["deleteByIdEx"] = None, jobsTable.getDeleteQuery(
				"jobId=%(jobId)s")[0], None

			jobsTable = rsc.TableForDef(td, connection=conn)
			res["getById"] = jobsTable.getQuery(td, "jobId=%(jobId)s", {"jobId": 0})
			res["getAllIds"] = jobsTable.getQuery(
				[td.getColumnByName("jobId")], "")

			countField = base.makeStruct(
				svcs.OutputField, name="count", type="integer", select="count(*)",
					required=True)
			res["countRunning"] = jobsTable.getQuery([countField],
				"phase='EXECUTING'")
			res["countQueued"] = jobsTable.getQuery([countField],
				"phase='QUEUED'")

			self._makeMoreStatements(res, jobsTable)
		return res

	@property
	def _statements(self):
		"""returns a dictionary of canned statements manipulating the jobs
		table.
		"""
		if self._statementsCache is None:
				self._statementsCache = self._makeStatementsCache()
		return self._statementsCache

	def runCanned(self, statementId, args, conn):
		"""runs the canned statement statementId with args through the
		DB connection conn.

		This will return row dictionaries of the result if there is a result.
		"""
		resultTableDef, statement, _ = self._statements[statementId]
		cursor = conn.cursor()

		try:
			cursor.execute(statement, args)
		except base.QueryCanceledError:
			conn.rollback()
			raise base.ReportableError("Could not access the jobs table."
				" This probably means there is a stale lock on it.  Please"
				" notify the service operators.")

		res = None
		if resultTableDef:
			res = [resultTableDef.makeRowFromTuple(row)
				for row in cursor]
		cursor.close()
		return res

	def _serializeProps(self, props, writableConn):
		"""inserts (or overwrites) the job properties props through
		wriableConn.
		"""
		self.runCanned("feedToIdEx", props, writableConn)

	def getNewJobId(self, **kws):
		"""creates a new job and returns its id.

		kws can be properties of the new job or the special key timeout
		giving after how many seconds we should give up trying to lock
		the db.
		"""
		timeout = kws.pop("timeout", _DEFAULT_LOCK_TIMEOUT)

		try:
			with base.getWritableTableConn() as conn:
				# We fire off a quick pointless query to catch server restarts;
				# if this fails, the connection pools are cleared and the next
				# queries will run again.
				conn.execute("SELECT table_name FROM TAP_SCHEMA.tables LIMIT 1")
		except base.DBError:
				pass

		with base.getWritableTableConn() as conn:
			with conn.parameters([
					("statement_timeout", "%s ms"%int(timeout*1000))]):
				props = self.jobClass.getDefaults(conn)
				props["jobId"] = self.jobClass.getNewId(self, conn)
				job = self.jobClass(props, self, writable=True)
				job.change(**kws)
				self._serializeProps(job.getProperties(), conn)
		return job.jobId

	def getNewIdFromRequest(self, request, service):
		"""returns the id of a new TAP job created from request.

		Request has to be a t.w request or similar, but preprocessed
		by the renderer to have a uwsArgs attribute; see taprender for how this
		is to be done.
		"""
		jobId = self.getNewJobId()
		try:
			with self.changeableJob(jobId) as wjob:
				wjob.setParamsFromDict(request.uwsArgs)

				if request.getAuthUser():
					wjob.change(owner=request.getUser())
		except:
			# something went wrong while setting up the request; do away with it
			# again
			self.destroy(jobId)
			raise
		return jobId

	def _getJob(self, jobId, conn, writable=False):
		"""helps getJob and getNewJob.
		"""
		# Caution: this code is copied in useruws.UserUWS._getJob
		# If you find you need to change this, this may be a good
		# time to figure out how to refactor  this method.
		statementId = 'getById'
		if writable:
			statementId = 'getByIdEx'
		res = self.runCanned(statementId, {"jobId": jobId}, conn)
		if len(res)!=1:
			raise JobNotFound(jobId)
		return self.jobClass(res[0], self, writable)

	def getJob(self, jobId):
		"""returns a read-only UWSJob for jobId.

		Note that by the time you do something with the information here,
		the "true" state in the database may already be different.  There
		should be no way to write whatever information you have in here,
		so any "racing" here shouldn't hurt.
		"""
		with base.getTableConn() as conn:
			return self._getJob(jobId, conn)

	def getNewJob(self, **kws):
		"""creates a new job and returns a read-only instance for it.
		"""
		newId = self.getNewJobId(**kws)
		return self.getJob(newId)

	@contextlib.contextmanager
	def changeableJob(self, jobId, timeout=_DEFAULT_LOCK_TIMEOUT):
		"""a context manager for job manipulation.

		This is done such that any changes to the job's properties
		within the controlled section get propagated to the database.
		As long as you are in the controlled section, nobody else
		can change the job.
		"""
		with base.getWritableTableConn() as conn:
			with conn.parameters([
					("statement_timeout", "%s ms"%int(timeout*1000))]):
				job = self._getJob(jobId, conn, writable=True)
				try:
					yield job
				except:
					conn.rollback()
					raise
				else:
					self._serializeProps(job.getProperties(), conn)
					conn.commit()

	def changeToPhase(self, jobId, newPhase, input=None, 
			timeout=_DEFAULT_LOCK_TIMEOUT):
		with self.changeableJob(jobId, timeout=timeout) as wjob:
			try:
				transition = wjob.getTransitionTo(newPhase)
				return transition(newPhase, wjob, input)
			except Exception as exception:
				# transition to error if possible.  If that fails at well,
				# blindly set error and give up.
				try:
					if newPhase!=ERROR:
						return wjob.getTransitionTo(ERROR)(ERROR, wjob, exception)
				except:
					wjob.change(phase=ERROR, error=exception)
				raise

	def destroy(self, jobId):
		"""removes the job with jobId from the UWS.

		This calls the job's prepareForDestruction method while the job is writable.
		"""
		try:
			try:
				with self.changeableJob(jobId) as job:
					job.prepareForDestruction()
			except Exception as exc:
				base.ui.notifyWarning(
					"Ignored error while destroying UWS job %s: %s"%(jobId, exc))
		finally:
			with base.getWritableTableConn() as conn:
				self.runCanned("deleteByIdEx", locals(), conn)

	def _countUsingCanned(self, statementId):
		"""helps the count* methods.
		"""
		with base.getTableConn() as conn:
			return self.runCanned(statementId, {}, conn)[0]["count"]

	def countRunningJobs(self):
		"""returns the number of EXECUTING jobs in the jobsTable.
		"""
		return self._countUsingCanned('countRunning')

	def countQueuedJobs(self):
		"""returns the number of QUEUED jobs in jobsTable.
		"""
		return self._countUsingCanned('countQueued')

	def getJobIds(self):
		"""returns a list of all currently existing job ids.
		"""
		with base.getTableConn() as conn:
			return [r["jobId"] for r in self.runCanned('getAllIds', {}, conn)]

	def getIdsAndPhases(self, owner=None, phase=None, last=None, after=None,
			initFragments=None, initPars=None):
		"""returns pairs for id and phase for all jobs in the UWS.

		phase, last, after are the respective parameters from UWS 1.1.
		"""
		pars = locals()
		fragments = initFragments or []
		pars.update(initPars or {})
		limits = None

		if owner is not None:
			fragments.append("owner=%(owner)s")

		if phase is not None:
			fragments.append("phase=%(phase)s")

		if last is not None:
			limits = "ORDER BY creationTime DESC LIMIT %(limit)s"
			pars['limit'] = last

		if after is not None:
			fragments.append("creationTime>%(after)s")

		td = self.jobClass.jobsTD

		with base.getTableConn() as conn: 
			return conn.query(td.getSimpleQuery(["jobId", "phase"],
				fragments=base.joinOperatorExpr("AND", fragments),
				postfix=limits), pars)

	def cleanupJobsTable(self, includeFailed=False, includeCompleted=False,
			includeAll=False, includeForgotten=False):
		"""removes expired jobs from the UWS jobs table.

		The constructor arranged for this to be called now and then
		(cleanupFrequency class attribute, defaulting to 12*3600).

		The functionality is also exposed through gavo admin cleanuws; this
		also lets you use the includeFailed and includeCompleted flags.  These
		should not be used on production services since you'd probably nuke
		jobs still interesting to your users.
		"""
		with base.AdhocQuerier() as q:
			if not q.getTableType(self.jobClass.jobsTD.getQName()):
				# no jobs table, nothing to clean up
				return

		phasesToKill = set()
		if includeFailed or includeAll:
			phasesToKill.add(ERROR)
			phasesToKill.add(ABORTED)
		if includeCompleted or includeAll:
			phasesToKill.add(COMPLETED)
		if includeAll:
			phasesToKill.add(PENDING)
			phasesToKill.add(QUEUED)
		if includeForgotten:
			phasesToKill.add(PENDING)

		fragments = "destructionTime<%(now)s"
		if phasesToKill:
			fragments = "destructionTime<%(now)s or phase in %(ptk)s"

		for row in self.jobClass.jobsTD.doSimpleQuery(
				['jobId'], 
				fragments,
				{"now": datetime.datetime.utcnow(), "ptk": phasesToKill}):
			jobId = row["jobId"]
			try:
				self.destroy(jobId)
			except base.QueryCanceledError: # job locked by something, don't hang
				base.ui.notifyWarning("Postponing destruction of %s: Locked"%
					jobId)
			except JobNotFound:
				# Someone else has cleaned up -- that's ok
				pass

	def getURLForId(self, jobId):  # pragma: no cover
		"""returns the handling URL for the job with jobId.

		You must override this in deriving classes.
		"""
		raise NotImplementedError("Incomplete UWS (getURLForId not overridden).")


class UWSWithQueueing(UWS):
	"""A UWS with support for queueing.

	Queuing is done on UWS level rather than at transitions.  With a plain
	UWS, if something is put on the queue, it must be started by the 
	Transition's queueJob method.

	With UWSWithQueuing, you just mark the job queued and the rest is
	taken care of by the UWS itself.
	"""
	# _processQueueDirty is managed through scheduleProcessQueueCheck
	_processQueueDirty = False
	# How many jobs will the UWS (try to) run at the same time?
	runcountGoal = 1

	def __init__(self, jobClass, actions):
		# processQueue shouldn't strictly need a lock.  The lock mainly
		# protects against running more unqueuers than necessary
		self._processQueueLock = threading.Lock()
		UWS.__init__(self, jobClass, actions)

	def _makeMoreStatements(self, statements, jobsTable):
		UWS._makeMoreStatements(self, statements, jobsTable)
		td = jobsTable.tableDef

		countField = base.makeStruct(
			svcs.OutputField, name="count", type="integer", select="count(*)",
				required=True)

		statements["countQueuedBefore"] = jobsTable.getQuery(
			[countField],
			"phase='QUEUED' and destructionTime<=%(dt)s",
			{"dt": None})

		statements["getIdsScheduledNext"] = jobsTable.getQuery(
			[jobsTable.tableDef.getColumnByName("jobId")],
			"phase='QUEUED'",
			limits=('ORDER BY destructionTime ASC', {}))

		statements["getHungCandidates"] = jobsTable.getQuery([
			td.getColumnByName("jobId"),
			td.getColumnByName("pid")],
			"phase='EXECUTING'")

	def scheduleProcessQueueCheck(self):
		"""tells TAP UWS to try and dequeue jobs next time checkProcessQueue
		is called.

		This function exists since during the TAPTransistions there's
		a writable job and processing the queue might deadlock.  So, rather
		than processing right away, we just note something may need to be
		done.
		"""
		self._processQueueDirty = True

	def checkProcessQueue(self):
		"""sees if any QUEUED process can be made EXECUTING.

		This must be called while you're not holding any changeableJob.
		"""
		if self._processQueueDirty:
			self._processQueueDirty = False
			self._processQueue()

	def _processQueue(self):
		"""tries to take jobs from the queue.

		This function is called from checkProcessQueue when we think
		from EXECUTING so somewhere else.

		Currently, the jobs with the earliest destructionTime are processed
		first.  That's, of course, completely ad-hoc.
		"""
		if not self._processQueueLock.acquire(False):
			# There's already an unqueuer running, don't need a second one
			# Note that other processes (e.g., taprunner) might still be manipulating
			# the jobs table, so don't rely on the tables not changing here.
			return
		else:
			try:
				if self.countQueuedJobs()==0:
					return

				try:
					started = 0
					with base.getTableConn() as conn:
						toStart = [row["jobId"] for row in
							self.runCanned('getIdsScheduledNext', {}, conn)]

					while toStart:
						if self.countRunningJobs()>=self.runcountGoal:
							break
						self.changeToPhase(toStart.pop(0), EXECUTING)
						started += 1
					
					if started==0:
						# No jobs could be started.  This may be fine when long-runnning
						# jobs  block job submission, but for catastrophic slave
						# failures we want to make sure all jobs we think are executing
						# actually are.  If they've silently died, we log that and
						# push them to error.
						# We only want to do that if we're the server -- any other
						# process couldn't see the pids anyway.
						if base.IS_DACHS_SERVER:
							self._ensureJobsAreRunning()
				except Exception:
					base.ui.notifyError("Error during queue processing, "
						" the UWS %s is probably botched now."%self.__class__.__name__)
			finally:
				self._processQueueLock.release()

	def _ensureJobsAreRunning(self):
		"""pushes all executing slave jobs that silently died to ERROR.
		"""
		with base.getTableConn() as conn:
			for row in self.runCanned("getHungCandidates", {}, conn):
				jobId, pid = row["jobId"], row["pid"]

				if pid is None:
					self.changeToPhase(jobId, "ERROR",
						UWSError("EXECUTING job %s had no pid."%jobId, jobId))
					base.ui.notifyError("Stillborn async slave %s"%jobId)
				elif pid<2:
					# these are jobs run within the server -- we can't do
					# anything about them using process manipulation (if at all,
					# they may be realised as threads).  Skip them here.
					continue
				else:
					pass
# We should be checking if the process is still running.  Alas,
# there's serious syncing issues here that need to be investigated.
# Let's rely on the slaves cleaning up behind themselves.
#					try:
#						os.waitpid(pid, os.WNOHANG)
#					except os.error, ex: # child presumably is dead
#						# the following doesn't hurt if the job has gone to COMPLETED
#						# in the meantime -- we don't transition *from* COMPLETED.
#						self.changeToPhase(jobId, "ERROR",
#							uws.UWSError("EXECUTING job %s has silently died."%jobId, jobId))
#						base.ui.notifyError("Zombie taprunner: %s"%jobId)

	def changeToPhase(self, jobId, newPhase, input=None, timeout=10):
		"""overridden here to hook in queue management.
		"""
		UWS.changeToPhase(self, jobId, newPhase, input, timeout)
		self.checkProcessQueue()


class ParameterRef(object):
	"""A UWS parameter that is (in effect) a URL.

	This always contains a URL.  In case of uploads, the tap renderer makes
	sure the upload is placed into the upload directory and generates a
	URL; in that case, local is True.

	You need this class when you want the byReference attribute in
	the UWS.parameter element to be true.
	"""
	def __init__(self, url, local=False):
		self.local = local
		self.url = url


class JobParameter(object):
	"""A UWS job parameter.

	Job parameters are (normally) elements of a job's parameters dictionary,
	i.e., usually elements of the job control language.  "Magic" parameters
	that allow automatic serialization or special effects on the job
	can be defined using the _parameter_name class attributes of
	UWSJobs.  They are employed when using the setSerializedPar and
	getSerializedPar interface.

	All methods of these are class or static methods since 
	ProtocolParameters are never instanciated.

	The default value setter enforces a maximum length of 50000 characters
	for incoming strings.  This is a mild shield against accidental DoS
	with, e.g., bad uploads in TAP.

	The serialization thing really belongs to the user-facing interface.
	However, since it's most convenient to have these parameters
	in the UWSJob classes, the class is defined here.

	Internal clients would read the parameters directly from the dictionary.

	Methods you can override:

		- addPar(value, job) -> None -- parse value and perform some action
			(typically, set an attribute) on job.  The default implementation
			puts a value into the parameters dictionary _deserialized.
		- getPar(job) -> string -- infer a string representation of the
			job parameter on job.  The default implementation gets
			the value from the parameter from the parameters dictionary and
			_serializes it.
		- _deserialize(str) -> val -- turns something from an XML tree
		  into a python value as usable by the worker
		- _serialize(val) -> str -- yields some XML-embeddable serialization
		  of val

	The default implementation just dumps/gets name from the job's
	parameters dict.  This is the behaviour for non-magic parameters
	since (get|set)SerializedPar falls back to the base class.

	CAUTION: Do *not* say job.parameters[bla] = "ab" -- your changes
	will get lost because serialisation of the parameters dictionary must
	be initiated manually.  Always manipulate the parameters dictionary
	by using cls.updatePar(name, value, job) or a suitable
	facade (job.setPar, job.setSerializedPar)
	"""
	_deserialize, _serialize = staticmethod(strOrEmpty), staticmethod(strOrEmpty)

	@classmethod
	def updatePar(cls, name, value, job):
		"""enters name:value into job's parameter dict.

		See the uws.JobParameter's docstring.
		"""
		# this is a bit magic because job.parameters is only re-encoded
		# on demand
		parameters = job.parameters
		if isinstance(value, str):
			value = cls._deserialize(value)
		parameters[name] = value
		# see our docstring
		job.change(parameters=parameters)

	@classmethod
	def addPar(cls, name, value, job):
		# this is a somewhat lame protection against file uploads
		# gone haywire: ignore any values longer than 50k
		if isinstance(value, str) and len(value)>50000:
			base.ui.notifyWarning("UWS Parameter %s discarded as too long; first"
				" bytes: %s"%(name, repr(value[:20])))
		cls.updatePar(name, cls._deserialize(value), job)

	@classmethod
	def getPar(cls, name, job):
		return cls._serialize(job.parameters.get(name))


class UploadParameter(JobParameter):
	"""A generic DALI-style upload facility.

	We add this to all UWS job classes when their underlying services have
	a file-typed input key.  It will contain some somewhat arbitrary string
	that lets people guess what they uploaded.  TAP does this a bit
	differently from useruws, which tries a somewhat rationalised approach.
	"""
	# the implementation is messy -- as for inline uploads, two parameters
	# are involved (UPLOAD and the file parameter) and the normal UWS parameter
	# interface only passes the parameter to be processed, we need to steal
	# the request from upstack.   This, admittedly, is ugly, but then
	# the UPLOAD design is botched, so I feel entitled to play it dirty
	# rather than spoil my design.
	@classmethod
	def _deserialize(cls, value):
		if value is None:
			return []
		return value.split("/")

	@classmethod
	def _serialize(cls, value):
		if value is None:
			return ""
		return "/".join(value)

	@classmethod
	def addPar(cls, name, value, job):
		if not value.strip():
			return

		for newFName in dali.writeUploadBytesTo(
				utils.stealVar("request"), os.path.join(job.getWD())):
			job.setPar(newFName, newFName)


class FileParameter(JobParameter):
	"""an uploaded file.

	These are being created by posting to upload in the current design;
	hence, we fail on an attempt to addPar those.  The serialisation
	yields ParameterRefs.

	Note that TAP uploads currently employ a different scheme since TAP
	uploads don't match what DALI says.

	The stored values are always URLs into our service, regardless of where
	the upload came from.  For simplicity, we store the things in results.

	TODO: We should preserve the media type of the upload where available.
	"""
	@classmethod
	def _serialize(cls, value):
		if value is None:
			return ""
		return ParameterRef(value)

	@classmethod
	def updatePar(cls, name, value, job):
		# value is the file name (that happens to be the name of the input key;
		# in DALI terms, it's what's in front of the comma.
		JobParameter.updatePar(name, job.getURL()+"/results/"+value, job)

	@classmethod
	def addPar(self, name, value, job):
		raise base.ValidationError("File parameters cannot be set by posting to"
			" them.  Use DALI-style UPDATEs for them.", name)


class UWSJobType(type):
	"""The metaclass for UWS jobs.

	We have the metaclass primarily because we want to delay loading
	the actual definition until it is actually needed (otherwise we
	might get interesting chicken-egg-problems with rscdesc at some point).

	A welcome side effect is that we can do custom constructors and
	similar cosmetic deviltry.
	"""
	@property
	def jobsTD(cls):
		try:
			return cls._jobsTDCache
		except AttributeError:
			cls._jobsTDCache = base.resolveCrossId(cls._jobsTDId, rscdef.TableDef)
			return cls._jobsTDCache
	

class BaseUWSJob(object, metaclass=UWSJobType):
	"""An abstract UWS job.

	UWS jobs are always instanciated with a row from the associated
	jobs table (i.e. a dictionary giving all the uws properties).  You
	can read the properties as attributes.  UWSJobs also keep
	a (weak) reference to the UWS that made them.

	To alter uws properties, use the change method.  This will fail unless
	the job was created giving writable=True.

	To make it concrete, you need to define:

	- a _jobsTDid attribute giving the (cross) id of the UWS jobs
	  table for this kind of job
	- a _transitions attribute giving a UWSTransitions instance that defines
	  what to do on transistions
	- as needed, class methods _default_<parName> if you need to default
	  job parameters in newly created jobs
	- as needed, methods _decode_<parName> and _encode_<parName>
	  to bring uws parameters (i.e., everything that has a DB column)
	  from and to the DB representation from *python* values.

	You may want to override:

	- a class method getNewId(uws, writableConn) -> str, a method 
	  allocating a unique id for a new job and returning it.  Beware
	  of races here; if your allocation is through the database table,
	  you'll have to lock it *and* write a preliminary record with your new
	  id.  The default implementation does this, but if you want
	  something in the file system, you probably don't want to
	  use that.
	- a method setParamsFromDict(argDict), which takes a dictionary 
	  (in general the result of a contextgrammar) and sets the job parameters.
	  This is only necessary if you need extra mappings between names and such.

	For every piece of the job parameters, define
	class attributes _parameters_<parname.lower()> with JobParameter
	values saying how they are serialized and deserialized.   Only
	parameters defined in this way are accepted and integrated
	into the parameters dict.
	
	If you need to clean up before the job is torn down, redefine
	the prepareForDestruction method.
	"""

	def __init__(self, props, uws, writable=False):
		object.__setattr__(self, "_props", props)
		self.writable = writable
		self.uws = weakref.proxy(uws)

	def __getattr__(self, name):
		if name in self._props:
			return getattr(self, "_decode_"+name, utils.identity)(
				self._props[name])
		raise AttributeError("%s objects have no attribute '%s'"%(
			self.__class__.__name__, name))

	def __setattr__(self, name, value):
		# ward against tempting bugs, disallow assigning to names in _props:
		if name in self._props:
			raise TypeError("Use the change method to change the %s"
				" attribute."%name)
		object.__setattr__(self, name, value)

	@property
	def quote(self):
		"""Always returns None.

		Override if you have a queue management.
		"""
		return None

	@classmethod
	def getNewId(cls, uws, conn):
		cursor = conn.cursor()
		tableName = cls.jobsTD.getQName()
		cursor.execute("LOCK TABLE %s IN ACCESS SHARE MODE"%tableName)
		try:
			while True:
				newId = utils.getRandomString(10)
				cursor.execute("SELECT * FROM %s WHERE jobId=%%(jobId)s"%tableName,
					{"jobId": newId})
				if not list(cursor):
					cursor.execute(
						"INSERT INTO %s (jobId) VALUES (%%(jobId)s)"%tableName,
						{"jobId": newId})
					break
			cursor.close()
			conn.commit()
		except:
			conn.rollback()
			raise
		return newId

	@classmethod
	def getDefaults(cls, conn):
		"""returns a dictionary suitable for inserting into a jobsTD table.
		"""
		res = {}
		for column in cls.jobsTD:
			name = column.name
			res[name] = getattr(cls, "_default_"+name, lambda: None)()
		return res

	@classmethod
	def _default_phase(cls):
		return PENDING

	@classmethod
	def _default_executionDuration(cls):
		return base.getConfig("async", "defaultExecTime")

	@classmethod
	def _default_creationTime(cls):
		return datetime.datetime.utcnow()

	@classmethod
	def _default_destructionTime(cls):
		return datetime.datetime.utcnow()+datetime.timedelta(
			seconds=base.getConfig("async", "defaultLifetime"))

	def _encode_error(self, value):
		"""returns a pickled dictionary with error information.

		value can either be an exception object or a dictionary as
		generated here.
		"""
		if value is None:
			return None
		if not isinstance(value, dict):
			value = {
				"type": value.__class__.__name__,
				"msg": str(value),
				"hint": getattr(value, "hint", None),
			}
		return utils.getCleanBytes(pickle.dumps(value)).decode("ascii")

	def _decode_error(self, value):
		"""returns the unpickled three-item dictionary from the database string.
		"""
		if value is None:
			return None
		return pickle.loads(utils.getDirtyBytes(value))

	@classmethod
	def _default_parameters(cls):
		return utils.getCleanBytes(pickle.dumps({}, protocol=2)).decode("ascii")
	
	def _encode_parameters(self, value):
		"""(db format for parameters is a pickle)
		"""
		return utils.getCleanBytes(pickle.dumps(value, protocol=2)).decode("ascii")
	
	def _decode_parameters(self, value):
		"""(db format for parameters is a pickle)
		"""
		return pickle.loads(utils.getDirtyBytes(value))

	def _getParameterDef(self, parName):
		"""returns the job/uws parameter definition for parName and the name 
		the parameter will actually be stored as.

		All these parameters are forced to be lower case (and thus
		case-insensitive).  The actual storage name of the parameter is
		still returned in case saner conventions may be forthcoming.
		"""
		parName = parName.lower()
		name = "_parameter_"+parName
		if hasattr(self, name):
			return getattr(self, name), parName
		return JobParameter, parName

	def setSerializedPar(self, parName, parValue):
		"""enters parName:parValue into self.parameters after deserializing it.

		This is when input comes from text; use setPar for values already 
		parsed.
		"""
		parDef, name = self._getParameterDef(parName)
		parDef.addPar(name, parValue, self)

	def setPar(self, parName, parValue):
		"""enters parName:parValue into self.parameters.
		"""
		parDef, name = self._getParameterDef(parName)
		parDef.updatePar(name, parValue, self)

	def getSerializedPar(self, parName):
		"""returns self.parameters[parName] in text form.

		This is for use from a text-based interface.  Workers read from
		parameters directly.
		"""
		parDef, name = self._getParameterDef(parName)
		return parDef.getPar(name, self)

	def iterSerializedPars(self):
		"""iterates over the serialized versions of the parameters.
		"""
		for key in self.iterParameterNames():
			yield key, self.getSerializedPar(key)

	def iterParameterNames(self):
		"""iterates over the names of the parameters declared for the job.
		"""
		for n in dir(self):
			if n.startswith("_parameter_"):
				yield n[11:]

	def setParamsFromDict(self, argDict):
		"""sets our parameters from a dictionary of string lists.

		self must be writable for this to work.

		argDict should in general be the result of a contextgrammar,
		but, really, any dict will do.
		"""
		for key in self.iterParameterNames():
			if key in argDict:
				val = argDict[key]
				# TODO: handling multiple arguments must be way better thought
				# than just blank-joining them.  We *could* pickle lists, of
				# course... Well, let's wait for when we have a good case.
				if isinstance(val, list):
					val = " ".join(val)
				if not val:
					# have some way to re-set a parameter?  Anyway, I need to
					# ignore empty parameters or my XSLT form will break
					continue
				self.setSerializedPar(key, val)

	def getTransitionTo(self, newPhase):
		"""returns the action prescribed to push self to newPhase.

		A ValidationError is raised if no such transition is defined.
		"""
		return self._transitions.getTransition(self.phase, newPhase)

	def change(self, **kwargs):
		"""changes the property values to what's given by the keyword arguments.

		It is an AttributeError to try and change a property that is not defined.
		"""
		if not self.writable:
			raise TypeError("Read-only UWS job (You can only change UWSJobs"
				"obtained through changeableJob.")
		for propName, propValue in kwargs.items():
			if propName not in self._props:
				raise AttributeError("%ss have no attribute %s"%(
					self.__class__.__name__, propName))
			self._props[propName] = getattr(self, "_encode_"+propName,
				utils.identity)(propValue)

	def getProperties(self):
		"""returns the properties of the job as they are stored in the
		database.

		Use attribute access to read them and change to change them.  Do
		*not* get values from the dictionary you get and do *not* change
		the dictionary.
		"""
		return self._props

	def update(self):
		"""fetches a new copy of the job props from the DB.

		You should in general not need this, since UWSJob objects are intended
		to be short-lived ("for the duration of an async request").  Still,
		for testing and similar, it's convenient to be able to update
		a UWS job from the database.
		"""
		self._props = self.uws.getJob(self.jobId)._props

	def prepareForDestruction(self):
		"""is called before the job's database row is torn down.

		Self is writable at this point.
		"""

	def getURL(self):
		"""returns the UWS URL for this job.
		"""
		return self.uws.getURLForId(self.jobId)

	@contextlib.contextmanager
	def getWritable(self):
		"""a context manager for a writeable version of the job.

		Changes will be written back at the end, and the job object itself
		will be updated from the database.

		If self already is writable, it is returned unchanged, and changes
		are only persisted when the enclosing controlling block finishes.
		"""
		if self.writable:
			yield self
			return

		with self.uws.changeableJob(self.jobId) as wjob:
			yield wjob
		self.update()


class UWSJobWithWD(BaseUWSJob):
	"""A UWS job with a working directory.

	This generates ids from directory names in a directory (the
	uwsWD) shared for all UWSes on the system.

	It also adds methods 
	
	- getWD() -> str returning the working directory
	- addResult(self, source, mimeType, name=None) to add a new
	  result
	- openResult(self, mimeType, name) -> file to get an open file in the
	  WD to write to in order to generate a result
	- getResult(self, resName) -> str to get the *path* of a result with
	  resName
	- getResults(self) -> list-of-dicts to get dicts describing all
	  results available
	- openFile(self) -> file to get a file letting you read an existing
	  result.
	"""
	@classmethod
	def getNewId(self, uws, conn):
		# our id is the base name of the jobs's temporary directory.
		uwsWD = base.getConfig("uwsWD")
		utils.ensureDir(uwsWD, mode=0o775, setGroupTo=base.getGroupId())
		jobDir = tempfile.mkdtemp("", "", dir=uwsWD)
		return os.path.basename(jobDir)

	def getWD(self):
		return os.path.join(base.getConfig("uwsWD"), self.jobId)

	def prepareForDestruction(self):
		shutil.rmtree(self.getWD())

	# results management: We use a pickled list in the jobs dir to manage 
	# the results.  I once had a table of those in the DB and it just
	# wasn't worth it.  One issue, though: this potentially races
	# if two different processes/threads were to update the results
	# at the same time.  This could be worked around by writing
	# the results pickle only from within changeableJobs.
	# 
	# The list contains dictionaries having resultName and resultType keys.
	@property
	def _resultsDirName(self):
		return os.path.join(self.getWD(), "__RESULTS__")

	def _loadResults(self):
		try:
			with open(self._resultsDirName, "rb") as f:
				return pickle.load(f)
		except IOError:
			return []

	def _saveResults(self, results):
		handle, srcName = tempfile.mkstemp(dir=self.getWD())
		with os.fdopen(handle, "wb") as f:
			pickle.dump(results, f)
		# The following operation will bomb on windows when the second
		# result is saved.  Tough luck.
		os.rename(srcName, self._resultsDirName)

	def _addResultInJobDir(self, mimeType, name):
		resTable = self._loadResults()
		newRec = {'resultName': name, 'resultType': mimeType}

		for index, res in enumerate(resTable):
			if res["resultName"]==name:
				resTable[index] = newRec
				break
		else:
			resTable.append(
				{'resultName': name, 'resultType': mimeType})

		self._saveResults(resTable)

	def fixTypeForResultName(self, resultName, mediaType):
		"""sets the media type for result resultName.

		It is not an error if no result with resultName exists.
		"""
		resTable = self._loadResults()
		for row in resTable:
			if row["resultName"]==resultName:
				row["resultType"] = mediaType
		self._saveResults(resTable)

	def addResult(self, source, mimeType, name=None):
		"""adds a result, with data taken from source.

		source may be a file-like object or a string, or bytes.

		If no name is passed, a name is auto-generated.
		"""
		if name is None:
			name = utils.intToFunnyName(id(source))
		with open(os.path.join(self.getWD(), name), "wb") as destF:
			if isinstance(source, str):
				source = utils.bytify(source)
			if isinstance(source, bytes):
				destF.write(source)
			else:
				utils.cat(source, destF)
		self._addResultInJobDir(mimeType, name)

	def openResult(self, mimeType, name):
		"""returns a writable file that adds a result.
		"""
		self._addResultInJobDir(mimeType, name)
		return open(os.path.join(self.getWD(), name), "wb")

	def getResult(self, resName):
		"""returns a pair of file name and mime type for a named job result.

		If the result does not exist, a NotFoundError is raised.
		"""
		res = [r for r in self._loadResults() if resName==r["resultName"]]
		if not res:
			raise base.NotFoundError(resName, "job result",
				"uws job %s"%self.jobId)
		res = res[0]
		return os.path.join(self.getWD(), res["resultName"]), res["resultType"]

	def getResults(self):
		"""returns a list of this service's results.

		The list contains dictionaries having at least resultName and resultType
		keys.
		"""
		return self._loadResults()

	def openFile(self, name, mode="rb"):
		"""returns an open file object for a file within the job's work directory.

		No path parts are allowed on name.
		"""
		if "/" in name:
			raise ValueError("No path components allowed on job files.")
		return open(os.path.join(self.getWD(), name), mode)


class UWSTransitions(object):
	"""An abstract base for classes defining the behaviour of a UWS.

	This basically is the definition of a finite state machine with
	arbitrary input (which is to say: the input "alphabet" is up to
	the transitions).
	
	A UWSTransitions instance is in the transitions attribute of a job
	class.

	The main interface to UWSTransitions is getTransition(p1, p2) -> callable
	It returns a callable that should push the automaton from phase p1
	to phase p2 or raise an ValidationError for a field phase.

	The callable has the signature f(desiredPhase, wjob, input) -> None.
	It must alter the uwsJob object as appropriate.  input is some object
	defined by the the transition.  The job passed is a changeable job,
	so the handlers actually hold locks to the job row.  Thus, be brief.

	The transitions are implemented as simple methods having the signature
	of the callables returned by getTransition.  
	
	To link transistions and methods, pass a vertices list to the constructor.
	This list consists of 3-tuples of strings (from, to, method-name).  From and
	to are phase names (use the symbols from this module to ward against typos).
	"""
	def __init__(self, name, vertices):
		self.name = name
		self._buildTransitions(vertices)
	
	def _buildTransitions(self, vertices):
		self.transitions = {}
		# set some defaults
		for phase in [PENDING, QUEUED, EXECUTING, ERROR, ABORTED, COMPLETED]:
			self.transitions.setdefault(phase, {})[ERROR] = "flagError"
		self.transitions.setdefault(EXECUTING, {})[COMPLETED
			] = "noteEndTime"

		for fromPhase, toPhase, methodName in vertices:
			self.transitions.setdefault(fromPhase, {})[toPhase] = methodName
	
	def getTransition(self, fromPhase, toPhase):
		if (fromPhase==toPhase or
				fromPhase in END_STATES):
			# ignore null or ignorable transitions
			return lambda p, job, input: None
		try:
			methodName = self.transitions[fromPhase][toPhase]
		except KeyError:
			raise base.ui.logOldExc(
				base.ValidationError("No transition from %s to %s defined"
				" for %s jobs"%(fromPhase, toPhase, self.name),
				"phase", hint="This almost always points to an implementation error"))
		try:
			return getattr(self, methodName)
		except AttributeError:
			raise base.ui.logOldExc(
				base.ValidationError("%s Transitions have no %s methods"%(self.name,
				methodName),
				"phase", hint="This is an error in an internal protocol definition."
				"  There probably is nothing you can do but complain."))

	def noOp(self, newPhase, job, ignored):
		"""a sample action just setting the new phase.

		This is a no-op baseline sometimes useful in user code.
		"""
		job.change(phase=newPhase)

	def flagError(self, newPhase, wjob, exception):
		"""the default action when transitioning to an error: dump exception and
		mark phase as ERROR..
		"""
		wjob.change(phase=ERROR)
		# Validation errors don't get logged -- for one, they probably
		# are the user's fault, and for a second, logging them upsets
		# trial during testing, since trial examines the log.
		if not isinstance(exception, base.ValidationError):
			base.ui.notifyError("Error during UWS execution of job %s"%wjob.jobId)
		wjob.change(error=exception)
		if wjob.endTime is None:
			wjob.change(endTime=datetime.datetime.utcnow())
	
	def noteEndTime(self, newPhase, wjob, ignored):
		wjob.change(endTime=datetime.datetime.utcnow())


class SimpleUWSTransitions(UWSTransitions):
	"""A UWSTransitions with sensible transitions pre-defined.
	
	See the source for what we consider sensible.

	The idea here is that you simply override (and usually up-call)
	the methods queueJob, markAborted, startJob, completeJob,
	killJob, errorOutJob, and ignoreAndLog.

	You will have to define startJob and provide some way to execute
	startJob on QUEUED jobs (there's nothing wrong with immediately
	calling self.startJob(...) if you don't mind the DoS danger).

	Once you have startJob, you'll probably want to define killJob as
	well.
	"""
	def __init__(self, name):
		UWSTransitions.__init__(self, name, [
			(PENDING, QUEUED, "queueJob"),
			(PENDING, ABORTED, "markAborted"),
			(QUEUED, ABORTED, "markAborted"),
			(QUEUED, EXECUTING, "startJob"),
			(EXECUTING, COMPLETED, "completeJob"),
			(EXECUTING, ABORTED, "killJob"),
			(EXECUTING, ERROR, "errorOutJob"),
			(COMPLETED, ERROR, "ignoreAndLog"),
			])

	def queueJob(self, newState, wjob, ignored):
		"""puts a job on the queue.
		"""
		wjob.change(phase=QUEUED)

	def markAborted(self, newState, wjob, ignored):
		"""simply marks job as aborted.

		This is what happens if you abort a job from QUEUED or
		PENDING.
		"""
		wjob.change(phase=ABORTED,
			endTime=datetime.datetime.utcnow())

	def ignoreAndLog(self, newState, wjob, exc):
		"""logs an attempt to transition when it's impossible but
		shouldn't result in an error.

		This is mainly so COMPLETED things don't fail just because of some
		mishap.
		"""
		base.ui.logErrorOccurred("Request to push %s job to ERROR: %s"%(
			wjob.phase, str(exc)))

	def errorOutJob(self, newPhase, wjob, exception):
		"""pushes a job to an error state.

		This is called by a worker; leaving the error message itself
		is part of the worker's duty; here, exception will just be logged.
		"""
		wjob.change(phase=newPhase, endTime=datetime.datetime.utcnow())
		self.flagError(newPhase, wjob, exception)

	def killJob(self, newPhase, wjob, ignored):  # pragma: no cover
		"""should abort a job.

		There's really not much we can do here, so this is a no-op.

		Do not up-call here, you'll get a (then spurious) warning
		if you do.
		"""
		base.ui.notifyWarning("%s UWSes cannot kill jobs"%self.name)
	
	def completeJob(self, newPhase, wjob, ignored):
		"""pushes a job into the completed state.
		"""
		wjob.change(phase=newPhase, endTime=datetime.datetime.utcnow())


def _replaceFDs(inFName, outFName):
# This is used for clean forking and doesn't actually belong here.
# utils.ostricks should take this.
  """closes all (findable) file descriptors and replaces stdin with inF
  and stdout/err with outF.
  """
  for fd in range(255, -1, -1):
    try:
      os.close(fd)
    except os.error:
      pass
  inF, outF = open(inFName), open(outFName, "w")
  os.dup(inF.fileno())
  os.dup(outF.fileno())
  os.dup(outF.fileno())


class _UWSBackendProtocol(protocol.ProcessProtocol):
	"""The protocol used for taprunners when spawning them under a twisted
	reactor.
	"""
	def __init__(self, jobId, workerSystem):
		self.jobId = jobId
		self.workerSystem = workerSystem

	def outReceived(self, data):
		base.ui.notifyInfo("UWS worker %s produced output: %s"%(
			self.jobId, data))
	
	def errReceived(self, data):
		base.ui.notifyInfo("UWS worker %s produced an error message: %s"%(
			self.jobId, data))
	
	def processEnded(self, statusObject):
		"""tries to ensure the job is in an admitted end state.
		"""
		try:
			job = self.workerSystem.getJob(self.jobId)
			if job.phase==QUEUED or job.phase==EXECUTING:
				try:
					raise UWSError("Job hung in %s"%job.phase, job.jobId)
				except UWSError as ex:
					self.workerSystem.changeToPhase(self.jobId, ERROR, ex)
		except JobNotFound: # job already deleted
			pass


class ProcessBasedUWSTransitions(SimpleUWSTransitions):
	"""A SimpleUWSTransistions that processes its stuff in a child process.

	Inheriting classes must implement the getCommandLine(wjob) method --
	it must return a command (suitable for reactor.spawnProcess and
	os.execlp and a list of arguments suitable for reactor.spawnProcess.

	They must also implement some sort of queue management.  The the simplest
	case, override queueJob and start the job from there (but set
	to QUEUED in there anyway).
	"""
	# 2019 trial (at least sometimes) starts the reactor too late for
	# us to notice it's running in startJob.  The following attribute
	# lets the testing code force the use of twisted mechanismus.
	trial_forceTwisted = False

	def getCommandLine(self, wjob):
		raise NotImplementedError("%s transitions do not define how"
			" to get a command line"%self.__class__.__name__)

	def _startJobTwisted(self, wjob):
		"""starts a job by forking a new process when we're running 
		within a twisted reactor.
		"""
		assert wjob.phase==QUEUED
		cmd, args = self.getCommandLine(wjob)
		pt = reactor.spawnProcess(_UWSBackendProtocol(wjob.jobId, wjob.uws),
			cmd, args=args,
				env=os.environ)
		wjob.change(pid=pt.pid, phase=EXECUTING)

	def _startJobNonTwisted(self, wjob):
		"""forks off a new process when (hopefully) a manual child reaper 
		is in place.
		"""
		cmd, args = self.getCommandLine(wjob)
		pid = os.fork()
		if pid==0:
			_replaceFDs("/dev/zero", "/dev/null")
			os.execlp(cmd, *args)
		elif pid>0:
			wjob.change(pid=pid, phase=EXECUTING)
		else:
			raise Exception("Could not fork")
	
	def startJob(self, newState, wjob, ignored):
		"""causes a process to be started that executes job.

		This dispatches according to whether or not we are within a twisted
		event loop, mostly for testing support.
		"""
		withinTwisted = self.trial_forceTwisted or reactor.running
		# 2019 trial starts the reactor too late for us to notice we should
		# be using the twisted code.  I hack around it by the trial code
		if withinTwisted:
			return self._startJobTwisted(wjob)
		else:
			return self._startJobNonTwisted(wjob)

	def killJob(self, newState, wjob, ignored):
		"""tries to kill/abort job.

		Actually, there are two different scenarios here: Either the job has
		a non-NULL startTime.  In that case, the child job is in control 
		and will manage the state itself.  Then kill -INT will do the right 
		thing.

		However, if startTime is NULL, the child is still starting up.  Sending
		a kill -INT may do many things, and most of them we don't want.
		So, in this case we kill -TERM the child, do state management ourselves
		and hope for the best.
		"""
		try:
			pid = wjob.pid
			if pid is None:
				raise UWSError("Job is not running")
			elif pid<2:
				raise UWSError("Job has unkillable PID")

			if wjob.startTime is None:
				# the child job is not up yet, kill it brutally and manage
				# state ourselves
				os.kill(pid, signal.SIGTERM)
				self.markAborted(ABORTED, wjob, ignored)
			else:
				# child job is up, can manage state itself
				os.kill(pid, signal.SIGINT)
		except UWSError:
			raise
		except Exception as ex:
			raise UWSError(None, ex)