File: datapack.py

package info (click to toggle)
gavodachs 2.3%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 7,260 kB
  • sloc: python: 58,359; xml: 8,882; javascript: 3,453; ansic: 661; sh: 158; makefile: 22
file content (324 lines) | stat: -rw-r--r-- 9,226 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
"""
Dumping our resources to frictionless data packages (henceforce: datapack)
and loading from them again.

Specifications: 

* https://specs.frictionlessdata.io/data-package/
* https://specs.frictionlessdata.io/data-resource/

DaCHS-generated RDs can be recognised by the presence of
a dachs-rd-id key in the global metadata.  Also, we will always
write the RD as the first resource; for good measure, we also mark
it by having a dachs-resource-descriptor name.
"""

import glob
import json
import os
import shutil
import subprocess
import zipfile
from typing import Callable, Generator

from gavo import base
from gavo import rscdef
from gavo import rscdesc
from gavo import utils


def _perhaps(d:dict, key:str, val:str) -> None:
	if val:
		d[key] = val


def makeBasicMeta(rd:rscdesc.RD) -> dict:
	"""returns a basic, resource-less, datapack descriptor from an RD.
	"""
	res = {
		"dachs-resdir": 
			utils.getRelativePath(rd.resdir, base.getConfig("inputsDir")),
		"name": rd.schema,
		"homepage": base.makeAbsoluteURL("/browse/"+rd.sourceId),
		# TODO: We'd like to have an custom profile for DaCHS.  We'll
		# need to put in a json schema for that, though.  Look into it.
		"profile": "data-package",
	}

	_perhaps(res, "id", base.getMetaText(rd, "doi", acceptSequence=True))

	licenses = []
	for item in rd.iterMeta("rights"):
		try:
			uriMeta = base.getMetaText(item, "rightsURI")
			if uriMeta:
				licenses.append(
					{"path": str(uriMeta)})
		except base.NoMetaKey:
			pass
	_perhaps(res, "licenses", licenses)

	contributors = []
	for item in rd.iterMeta("creator"):
		contributors.append({
			"title": base.getMetaText(item, "name", "Unknown"),
			"role": "author"
		})
	_perhaps(res, "contributors", contributors)

	subjects = []
	for item in rd.iterMeta("subject"):
		subjects.append(str(item))
	_perhaps(res, "keywords", subjects)

	for ourKW, theirKW in [
			("title", None),
			("description", None),
			("version", None),
			("creationDate", "created")]:
		_perhaps(res, theirKW or ourKW, 
			base.getMetaText(rd, ourKW, acceptSequence=True))

	return res


def namer(template:str) -> Callable[[], int]:
	i = 0
	while True:
		yield template%i
		i += 1


def iterExtraResources(rd:rscdesc.RD, cleanPath:Callable[[str], str]
		) -> Generator[dict, None, None]:
	"""yields datapack resources from the datapack-extrafiles property.

	This is a json sequence, and files are only returned if they exist.
	Directories are ignored.
	"""
	makeName = namer("extrafile-%04d")
	pathLiteral = rd.getProperty("datapack-extrafiles", None)
	if pathLiteral is None:
		return

	patterns = json.loads(pathLiteral)

	for pattern in patterns:
		for match in glob.glob(rd.getAbsPath(pattern)):
			if os.path.isfile(match):
				yield {
					"name": next(makeName),
					"profile": "data-resource",
					"path": cleanPath(match)
				}


def iterRDResources(rd:rscdesc.RD) -> Generator[dict, None, None]:
	"""yields datapack resource descriptions for the RD and all
	ancillary files we can discover.

	All path names here are relative to the RD.  Anything that is not in the
	RD will not be exported (without serious trickery, that is).
	"""
	resdir = os.path.normpath(rd.resdir)
	def cleanPath(path):
		return utils.getRelativePath(path, resdir, liberalChars=True)

	# the RD itself
	yield {
		"name": "dachs-resource-descriptor",
		"path": cleanPath(rd.srcPath),
		"profile": "data-resource",
		"title": "The DaCHS resource descriptor",
		"mediatype": "text/xml",
	}

	# a README, if it's there
	if os.path.exists(rd.getAbsPath("README")):
		yield {
			"name": "readme",
			"path": "README",
			"profile": "data-resource",
			"mediatype": "text/plain",
		}
	
	# Manually added files
	for res in iterExtraResources(rd, cleanPath):
		yield res
	
	# extra instrumentation in getDRForDump creates the filesLoaded
	# attribute, which we use to add ancillary files used by the RD
	# TODO: catch a few more and add them in getRDForDump
	makeName = namer("rdaux-%02d")

	# filesLoaded are uniquefied because, e.g., multiple data
	# elements might use the same custom grammar.
	for extra in sorted(set(rd.filesLoaded)):
		fullPath = os.path.join(rd.resdir, extra)
		# hack: external python modules drop the .py in the attribute value
		if not os.path.exists(fullPath) and os.path.exists(fullPath+".py"):
			extra = extra+".py"

		yield {
			"name": next(makeName),
			"profile": "data-resource",
			"path": extra,
		}

	# files imported
	makeName = namer("data-%05d")
	for dd in rd.dds:
		if dd.sources and dd.sources.ignoredSources.ignoresVaryingStuff():
			base.ui.notifyWarning(
				"data %s#%s ignored because of dynamic ignoreSources"%(
					rd.sourceId, dd.id))
			continue

		for source in dd.iterSources():
			if isinstance(source, str) and os.path.exists(source):
				yield {
					"name": next(makeName),
					"profile": "data-resource",
					"path": cleanPath(source)}

			else:
				# it's probably some artificial source token; don't even
				# bother to report anything looks weird.
				pass


def makeDescriptor(rd:rscdesc.RD) -> dict:
	"""returns a datapack descriptor in a python dictionary.
	"""
	desc = makeBasicMeta(rd)
	desc["resources"] = list(iterRDResources(rd))
	return desc


def getRDForDump(rdId:str) -> rscdesc.RD:
	"""loads an RD for later dumping.

	The main thing this does is instrument ResdirRelativeAttribute
	(and possibly later other things) to record what ancillary
	data the RD has loaded.

	This is, of course, not thread-safe or anything, and it could collect
	false positives when RDs reference or include other RDs.

	Only use it while making datapacks.
	"""
	origParse = rscdef.ResdirRelativeAttribute.parse
	filesLoaded = []

	def record(self, val):
		filesLoaded.append(val)
		return origParse(self, val)

	try:
		rscdef.ResdirRelativeAttribute.parse = record
		rd = rscdesc.getRD(rdId)
		rd.filesLoaded = filesLoaded
	finally:
		rscdef.ResdirRelativeAttribute.parse = origParse

	return rd


def dumpPackage(rdId:str, destFile) -> None:
	"""write a zip of the complete data package for a resource descriptor
	to destFile.

	destFile an be anything that zip.ZipFile accepts in w mode.
	"""
	with zipfile.ZipFile(destFile, "w") as dest:
		rd = getRDForDump(rdId)
		descriptor = makeDescriptor(rd)
		dest.writestr("datapackage.json", json.dumps(descriptor))
		
		for rsc in descriptor["resources"]:
			dest.write(
				os.path.join(rd.resdir, rsc["path"]),
				rsc["path"])


def getPackageMeta(packageName:str) -> dict:
	"""returns a dict of DaCHS-specific metadata items from a DaCHS-produced
	data package.
	"""
	res = {}
	try:
		with zipfile.ZipFile(packageName, "r") as archive:
			with archive.open("datapackage.json") as f:
				packageMeta = json.load(f)

			if "dachs-resdir" not in packageMeta:
				raise ValueError("Need dachs-resdir key in package meta")
			res["resdir"] = packageMeta["dachs-resdir"]
		
			try:
				res0 = packageMeta["resources"][0]
			except (IndexError, KeyError):
				raise ValueError("Data package without resources")

			if res0["name"]!="dachs-resource-descriptor":
				raise ValueError("First data package resource isn't a DaCHS RD")
			
			res["rdpath"] = res0["path"]
	except Exception as ex:
		raise base.ui.logOldExc(
			base.ReportableError("%s is not a data package produced by DaCHS"
			" (or, if it was, DaCHS is broken and you should"
			" complain)"%packageName,
			hint="Here is what failed in the background: %s"%ex))

	return res


@utils.exposedFunction([
	utils.Arg("id", help="Absolute RD id to produce a datapack for"),
	utils.Arg("dest", help="Name of a zip file to dump to")],
	help="Produce a frictionless data package for the RD and the data"
		" it imports.")
def create(args:list):
	with open(args.dest, "wb") as dest:
		dumpPackage(args.id, dest)


@utils.exposedFunction([
	utils.Arg("source", help="Name of a DaCHS-produced data package zip file."),
	utils.Arg("-t", "--no-test", help="Do not run tests after importing.",
		dest="suppressTests", action="store_true"),
	utils.Arg("--force", help="If the resdir the package declares already"
		" exists, remove it before unpacking (rather than bailing out).",
		dest="removeResdir", action="store_true"),],
	help="Load and import a data package.  This only works for data packages"
		" actually produced by DaCHS.")
def load(args:list):
	packageMeta = getPackageMeta(args.source)

	absSource = os.path.abspath(args.source)
	os.chdir(base.getConfig("inputsDir"))
	if os.path.exists(packageMeta["resdir"]):
		if args.removeResdir:
			shutil.rmtree(packageMeta["resdir"])
		else:
			raise base.ReportableError(
				"Refusing to overwrite directory '%s'"%packageMeta["resdir"],
				hint="The data package is for a resource living in this resource"
					" directory, and it can only run there.  However, that directory"
					" already exists.  Move it away and try again.")
	base.makeSharedDir(packageMeta["resdir"])
	os.chdir(packageMeta["resdir"])

	subprocess.run(["unzip", absSource], check=True)

	subprocess.run(["dachs", "imp", packageMeta["rdpath"]], check=True)
	if not args.suppressTests:
		subprocess.run(["dachs", "test", "-v", packageMeta["rdpath"]])


def main():
	"""does the cli interaction.
	"""
	args = utils.makeCLIParser(globals()).parse_args()
	args.subAction(args)