File: executing.py

package info (click to toggle)
gavodachs 2.3%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 7,260 kB
  • sloc: python: 58,359; xml: 8,882; javascript: 3,453; ansic: 661; sh: 158; makefile: 22
file content (287 lines) | stat: -rw-r--r-- 9,914 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
"""
The execute element and related stuff.
"""

#c Copyright 2008-2020, the GAVO project
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import os
import re
import subprocess
import sys
import threading
import traceback

from gavo import base
from gavo.base import cron
from gavo.rscdef import common
from gavo.rscdef import procdef


class GuardedFunctionFactory(object):
	"""a class for making functions safe for cron-like executions.

	The main method is makeGuarded.  It introduces a lock protecting against
	double execution (if that were to happen, the execution is suppressed with a
	warning; of course, if you fork something into the background, that mechanism
	no longer works). The stuff is run in a thread, and exceptions caught.  If
	anything goes wrong during execution, a mail is sent to the administrator.

	Note that, in contrast to cron, I/O is not captured (that would
	be difficult for threads; we don't want processes because of
	the potential trouble with database connections).

	There's a module-private instance of this that's used by Execute.
	"""
	def __init__(self):
		self.threadsCurrentlyActive = []
		self.activeListLock = threading.Lock()

	def _reapOldThreads(self):
		if len(self.threadsCurrentlyActive)>10:
			base.ui.notifyWarning("There's a suspicious number of cron"
				" threads active (%d).  You should check what's going on."%
				len(self.threadsCurrentlyActive))

		newThreads = []
		with self.activeListLock:
			for t in self.threadsCurrentlyActive:
				if t.is_alive():
					newThreads.append(t)
				else:
					t.join(timeout=0.001)
			self.threadsCurrentlyActive = newThreads

	def makeGuardedThreaded(self, callable, execDef):
		"""returns callable ready for safe cron-like execution.

		execDef is an Execute instance.
		"""
		serializingLock = threading.Lock()
		
		def innerFunction():
			try:
				try:
					execDef.outputAccum = []
					callable(execDef.rd, execDef)
				except Exception:
					base.ui.notifyError("Uncaught exception in timed job %s."
						" Trying to send traceback to the maintainer."%execDef.jobName)
					cron.sendMailToAdmin("DaCHS Job %s failed"%execDef.jobName,
						"".join(traceback.format_exception(*sys.exc_info())))
			finally:
				serializingLock.release()
			if execDef.debug and execDef.outputAccum:
				cron.sendMailToAdmin("Debug output of DaCHS Job %s"%execDef.jobName,
					"\n".join(execDef.outputAccum))
				del execDef.outputAccum

		def cronFunction():
			self._reapOldThreads()
			if not serializingLock.acquire(False):
				base.ui.notifyWarning("Timed job %s has not finished"
					" before next instance came around"%execDef.jobName)
				return
			t = threading.Thread(name=execDef.title, target=innerFunction)
			base.ui.notifyInfo("Spawning thread for cron job %s"%execDef.title)
			t.daemon = True
			t.start()

			with self.activeListLock:
				self.threadsCurrentlyActive.append(t)

			return t

		return cronFunction

_guardedFunctionFactory = GuardedFunctionFactory()


class CronJob(procdef.ProcApp):
	"""Python code for use within execute.

	The resource descriptor this runs at is available as rd, the execute
	definition (having such attributes as title, job, plus any
	properties given in the RD) as execDef.

	Note that no I/O capturing takes place (that's impossible since in
	general the jobs run within the server).  To have actual cron jobs,
	use ``execDef.spawn(["cmd", "arg1"...])``.  This will send a mail on failed
	execution and also raise a ReportableError in that case.

	In the frequent use case of a resdir-relative python program, you
	can use the ``execDef.spawnPython(modulePath)`` function.

	If you must stay within the server process, you can do something like::

		mod, _ = utils.loadPythonModule(rd.getAbsPath("bin/coverageplot.py"))
		mod.makePlot()
	
	-- in that way, your code can sit safely within the resource directory
	and you still don't have to manipulate the module path.
	"""
	name_ = "job"
	formalArgs = "rd, execDef"


class Execute(base.Structure, base.ExpansionDelegator):
	"""a container for calling code.

	This is a cron-like functionality.  The jobs are run in separate
	threads, so they need to be thread-safe with respect to the
	rest of DaCHS.	DaCHS serializes calls, though, so that your
	code should never run twice at the same time.

	At least on CPython, you must make sure your code does not
	block with the GIL held; this is still in the server process.
	If you do daring things, fork off (note that you must not use
	any database connections you may have after forking, which means
	you can't safely use the RD passed in).  See the docs on `Element job`_.

	Then testing/debugging such code, use ``gavo admin execute rd#id``
	to immediately run the jobs.
	"""
	name_ = "execute"

	_title = base.UnicodeAttribute("title",
		default = base.Undefined,
		description="Some descriptive title for the job; this is used"
			" in diagnostics.",
		copyable=False,)

	_at = base.StringListAttribute("at",
		description="One or more hour:minute pairs at which to run"
			" the code each day.  This conflicts with every.  Optionally,"
			" you can prefix each time by one of m<dom> or w<dow> for"
			" jobs only to be exectued at some day of the month or week, both"
			" counted from 1.  So, 'm22 7:30, w3 15:02' would execute on"
			" the 22nd of each month at 7:30 UTC and on every wednesday at 15:02.",
		default=base.NotGiven,
		copyable=True,)

	_every = base.IntAttribute("every",
		default=base.NotGiven,
		description="Run the job roughly every this many seconds."
		"  This conflicts with at.  Note that the first execution of"
		" such a job is after every/10 seconds, and that the timers"
		" start anew at every server restart.  So, if you restart"
		" often, these jobs may run much more frequently or not at all"
		" if the interval is large.  If every is smaller than zero, the"
		" job will be executed immediately when the RD is being loaded and is"
		" then run every abs(every) seconds",
		copyable=True,)

	_job = base.StructAttribute("job",
		childFactory=CronJob,
		default=base.Undefined,
		description="The code to run.",
		copyable=True,)

	_debug = base.BooleanAttribute("debug",
		description="If true, on execution of external processes (span or"
			" spawnPython), the output will be accumulated and mailed to"
			" the administrator.  Note that output of the actual cron job"
			" itself is not caught (it might turn up in serverStderr)."
			" You could use execDef.outputAccum.append(<stuff>) to have"
			" information from within the code included.", default=False)

	_properties = base.PropertyAttribute()

	_rd = common.RDAttribute()

	def spawn(self, cliList):
		"""spawns an external command, capturing the output and mailing it
		to the admin if it failed.

		Output is buffered and mailed, so it shouldn't be  too large.

		This does not raise an exception if the command fails (in normal usage,
		this would cause two mails to be sent).  Instead, it returns the 
		returncode of the spawned process; if that's 0, you're ok.  But
		in general, you wouldn't want to check it, as on failure you'll
		already receive a (more informative) mail.
		"""
		p = subprocess.Popen(cliList,
			stdin=subprocess.PIPE, stdout=subprocess.PIPE, 
			stderr=subprocess.STDOUT, close_fds=True)
		childOutput, _ = p.communicate()
		if p.returncode:
			cron.sendMailToAdmin("A process spawned by %s failed with %s"%(
				self.title, p.returncode),
				"Output of %s:\n\n%s"%(cliList, childOutput.decode("ascii", "ignore")))

		elif self.debug:
			if childOutput:
				self.outputAccum.append("\n\n%s -> %s\n"%(cliList, p.returncode))
				self.outputAccum.append(childOutput)

		return p.returncode
	
	def spawnPython(self, pythonFile):
		"""spawns a new python interpreter executing pythonFile.

		pythonFile may be resdir-relative.
		"""
		self.spawn(["python3", os.path.join(self.rd.resdir, pythonFile)])

	def _parseAt(self, atSpec, ctx):
		"""returns a tuple ready for cron.repeatAt from atSpec.

		see the at StringListAttribute for how it would look like; this
		parses one element of that string list.
		"""
		mat = re.match(r"(?P<dow>w\d\s+)?"	
			r"(?P<dom>m\d\d?\s+)?"
			r"(?P<hr>\d+):(?P<min>\d+)", atSpec)
		if not mat:
			raise base.LiteralParseError("at", atSpec, pos=ctx.pos, hint=
				"This is hour:minute optionally prefixed by either w<weekday> or"\
				" m<day of month>, each counted from 1.")
		
		hour, minute = int(mat.group("hr")), int(mat.group("min"))
		if not (0<=hour<=23 and 0<=minute<=59):
			raise base.LiteralParseError("at", atSpec, pos=ctx.pos, hint=
				"This must be hour:minute with 0<=hour<=23 or 0<=minute<=59")

		dom = None
		if mat.group("dom"):
			dom = int(mat.group("dom")[1:])
			if not 1<=dom<=28:
				raise base.LiteralParseError("at", atSpec, pos=ctx.pos, hint=
					"day-of-month in at must be between 1 and 28.")

		dow = None
		if mat.group("dow"):
			dow = int(mat.group("dow")[1:])
			if not 1<=dow<=7:
				raise base.LiteralParseError("at", atSpec, pos=ctx.pos, hint=
					"day-of-week in at must be between 1 and 7.")

		return (dom, dow, hour, minute)

	def completeElement(self, ctx):
		self._completeElementNext(Execute, ctx)
		if len([s for s in [self.at, self.every] if s is base.NotGiven])!=1:
			raise base.StructureError("Exactly one of at and every required"
				" for Execute", pos=ctx.pos)

		if self.at is not base.NotGiven:
			self.parsedAt = []
			for literal in self.at:
				self.parsedAt.append(self._parseAt(literal, ctx))

	def onElementComplete(self):
		self._onElementCompleteNext(Execute)

		self.jobName = "%s in %s"%(self.title, self.rd.sourceId)

		self.callable = _guardedFunctionFactory.makeGuardedThreaded(
			self.job.compile(), self)

		if self.at is not base.NotGiven:
			cron.repeatAt(self.parsedAt, self.jobName, self.callable)
		else:
			cron.runEvery(self.every, self.jobName, self.callable)