File: jsonserializer.py

package info (click to toggle)
python-pyrdfa 3.5.2%2B20220621~ds-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 584 kB
  • sloc: python: 5,386; makefile: 4; sh: 2
file content (409 lines) | stat: -rw-r--r-- 16,366 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
# -*- coding: utf-8 -*-
#
# Copyright (c) 2011 Daniel Gerber.
#
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU General Public License as published by
#the Free Software Foundation, either version 3 of the License, or
#(at your option) any later version.
#
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU General Public License for more details.
#
#You should have received a copy of the GNU General Public License
#along with this program. If not, see <http://www.gnu.org/licenses/>.

# Modified by Ivan Herman, 2012, to produce a JSON-LD serialization: http://json-ld.org/spec/latest/json-ld-syntax
"""
 $Id: jsonserializer.py,v 1.4 2012-12-05 19:18:17 ivan Exp $
"""
import sys

if sys.version_info[1] >= 7 :
	from collections import OrderedDict
else :
	from pyRdfaExtras.extras.odict import odict as OrderedDict

from rdflib.serializer import Serializer
from rdflib.term import URIRef, BNode, Literal
from rdflib	import RDF  as ns_rdf
from rdflib	import RDFS as ns_rdfs

RDFA_VOCAB = URIRef("http://www.w3.org/ns/rdfa#usesVocabulary")

class JsonSerializer(Serializer):
	__doc__ = __doc__
	# List of predicates that have a special usage and should not appear as part of the coerced list in the context
	non_coerced_predicates = [ ns_rdf["type"], ns_rdf["first"], ns_rdf["rest"] ]
	automatic_datatypes    = [ URIRef("http://www.w3.org/2001/XMLSchema#integer"),
							   URIRef("http://www.w3.org/2001/XMLSchema#double"),
							   URIRef("http://www.w3.org/2001/XMLSchema#boolean")
							 ]

	def __init__(self, graph):
		self.graph          = graph
		# Coerce structure. Is filled through _initialize_predicates()
		self.coerce         = OrderedDict()
		self.prefix_map     = PrefixMap(self.graph.namespaces())
		# List of subjects that should appear on top. Initialized to all subjects,
		# trimmed in _initialize_subjects()
		self.top_subjects   = set([ s for s in self.graph.subjects() ])
		# dictionary mapping subjects to JSON structures
		self.all_subjects   = OrderedDict()
		# Set of subjects that may appear as intermediate objects in a chain
		self.chain_links    = set()
		# Set of predicates whose range in this graph are URI References or Blank Nodes
		self.uri_predicates = set()
		# Single vocabulary, as generated by an RDFa processor
		self.vocab          = None
		# The 'owner' or the vocabulary, ie, the subject of the vocab setting triple (if there is only one)
		self.vocab_owner    = None
		# Vocabulary terms, ie, those terms that can be represented as being part of the vocab
		self.vocabulary_terms = {}
		# Lists
		self.lists          = {}
		# This is to keep the ancestors quiet
		self.base = None

	def serialize(self, stream, base=None, encoding='utf-8', **kwds):
		""" Generic entry point for the serialization, as used by RDFLib.
		    The serializer uses order preserving dictionaries to keep the right/expected order within the JSON-LD output, too. 
		"""
		if encoding == None : encoding = 'utf-8'
		# Create the dictionary to be serialized
		d = self._build(base=base, **kwds)
		
		if sys.version_info[1] >= 6 :
			import json
			s = json.dumps(d, ensure_ascii = False, indent=4)
		else :
			import simplejson
			s = simplejson.dumps(d, ensure_ascii = False, indent=4)

		try :
			stream.write(s)
		except UnicodeEncodeError :
			# This stuff occurs sometimes, and I am not sure why:-()
			stream.write(s.encode(encoding))

	def _build(self, base=None, prefix_map=None, encode_literal=None, **kwds):
		"""Returns an ordered dict to serialize. The base is, in fact, unused, it is only here
		because it appears in the rest of the RDFLib call structure..."""
		if encode_literal:
			assert callable(encode_literal)
			self._encode_literal = encode_literal
			
		if prefix_map:
			self.prefix_map.update(prefix_map)

		# Subjects: find the possible 'top level' subjects, possible links; result is a dictionary referring to the
		# json objects to produce
		self._initialize_subjects()
		# Predicates: find the predicates that can safely be coerced as producing URI-s
		self._initialize_predicates()
		# Name tells it all: find possible list structures in the graph and treat them separately
		self._initialize_lists()
		# Find out if the @vocab has been used in the graph; if so, and there is only one, the term usage is
		# reproduced in the JSON LD output
		self._rdfa_vocabulary_usage()

		# Fill in the content of the json objects 
		for s in self.all_subjects.keys() :
			# List headers are treated specially
			if s in self.lists : continue

			# Get the subject structure
			subj = self.all_subjects[s]
			# Get the possible types, and encode them through the special json-ld syntax
			types = [ t for t in self.graph.objects(s,ns_rdf["type"]) ]
			if len(types) == 1 :
				subj["@type"] = self._get_node_ref(t)
			elif len(types) > 1 :
				subj["@type"] = [ self._get_node_ref(t) for t in types ]

			# Get the other properties and their objects
			for p in self.graph.predicates(s) :
				# Types have already been taken care of
				if p == ns_rdf["type"] : continue
	
				pobj = self._predicate(p)
				objs = [ o for o in self.graph.objects(s,p) ]
				
				# The cardinality of objs makes a difference in the output format...
				if len(objs) == 0 :
					# Should not happen, though
					continue
				elif len(objs) == 1 :
					subj[pobj] = self._object(objs[0], s, p)
				else :
					subj[pobj] = [ self._object(ob, s, p) for ob in objs ]
					
		#######################################################################
		# Yet another beautification: if a top level object has no parent and is a blank node,
		# the @id is unnecessary.
		for s in self.top_subjects :
			if len([ p for (p,x,y) in self.graph.triples((None, None, s)) ]) == 0 :
				if isinstance(s, BNode) :
					self.all_subjects[s].pop("@id",None)
					
		#######################################################################
		# Put all together now in the top level object, ie, the one that produces the output
		_json_obj = OrderedDict()

		# Add the @context part, if needed
		if len(self.prefix_map.used_keys) > 0 or len(self.vocabulary_terms) > 0 or len(self.uri_predicates) > 0 :
			context = OrderedDict()
			predicate_handled = set()
			
			# Add the context for CURIE-s
			# If the predicate has been identified as producing URI references only, that is taken care of here
			for k,v in self.graph.namespaces() :
				if k in self.prefix_map.used_keys :
					predicate_handled.add(v)
					if v in self.uri_predicates :
						typ = OrderedDict()
						typ['@id']   = "%s" % v
						typ['@type'] = '@id'
						context[k] = typ
					else :
						context[k] = "%s" % v
				
			# Add the context for predicates that originate from an RDFa @vocab construct			
			# If the predicate has been identified as producing URI references only, that is taken care of here
			for k,v in self.vocabulary_terms.items() :
				predicate_handled.add(v)
				if v in self.uri_predicates :
					typ = OrderedDict()
					typ['@id']   = "%s" % v
					typ['@type'] = '@id'
					context[k] = typ
				else :
					context[k] = "%s" % v

			# Some predicates might have been used as full URI-s, but should still be added
			# for type coercion if they produce URI references only
			for p in self.uri_predicates :
				if p not in predicate_handled :
					typ = OrderedDict()
					typ['@type'] = '@id'
					cp = self.prefix_map.shrink(p)
					context[cp if cp != None else p] = typ
					
			# Context is done
			_json_obj["@context"] = context

		# Add the top level objects; the content of these have been filled by the previous steps
		# There is a big difference on whether there are several top level object or not; in the former
		# case the special JSON-LD "@graph" keyword has to be used as a key
		if len(self.top_subjects) == 1 :
			subj = self.all_subjects[self.top_subjects.pop()]
			for k in subj.keys() :
				_json_obj[k] = subj[k]
		elif len(self.top_subjects) > 1 :
			_json_obj["@graph"] = [ self.all_subjects[s] for s in self.top_subjects ]

		return _json_obj
	
	def _initialize_subjects(self) :
		"""Collect the various subject categories: initial top level list,
		chain links; plus a dictionary mapping subjects to their json structure.
		"""
		# First get all the subjects in one place
		# all subjects are represented by a json object, store those, too (though intially empty, but the
		# structure is then in place)
		# The initial list of subjects is also stored in the top_subject array,
		# ie, the list of those subjects that will appear on top of the serialized json output
		for s in self.top_subjects :
			js_struct = OrderedDict()
			# This may have to be refined to remove blank nodes in a chain link...
			js_struct["@id"] = self._get_node_ref(s)
			self.all_subjects[s] = js_struct
			
		# Now the chains have to be found
		for s in self.top_subjects :
			# see the number of parents, ie, the number of other subjects for which this subject appear as an object.
			# if the number is exactly one, this means this subject may appear in a chaining structure
			if len([ p for (p,x,y) in self.graph.triples((None, None, s)) ]) == 1 :
				self.chain_links.add(s)
				
	def _initialize_predicates(self) :
		"""Collect all those predicates whose objects are URIRefs only. This collection can be used
		to simplify the output by using JSON-LD's coercion facility.
		"""
		for p in set([ p for p in self.graph.predicates() ]) :
			if p not in self.non_coerced_predicates and True not in [isinstance(o, Literal) for s,x,o in self.graph.triples((None,p,None))] :
				self.uri_predicates.add(p)
			
	def _initialize_lists(self) :
		# Find and create the list structures
		def get_heads(l) :
			retval = []
			nl = l
			while True :				
				h = self.graph.value(nl,ns_rdf["rest"])
				if h != ns_rdf["nil"] :
					retval.append(h)
					nl = h
				else :
					break
			return retval
		
		for s in self.all_subjects.keys() :
			# See if this is a possible list in the first place
			if self.graph.value(s, ns_rdf["first"]) != None and self.graph.value(s, ns_rdf["rest"]) != None :
				if len([p for p,x,y in self.graph.triples((None,ns_rdf["rest"],s))]) == 0 :
					# Yep, this is the head of a list!
					# Let us collect the list. First the array of list content:
					content = [ c for c in self.graph.items(s) ]
					heads   = get_heads(s)
					# not a clean list...
					if False in [ isinstance(h, BNode) and h in self.chain_links for h in heads ] : break
					self.lists[s] = (content, heads)
		# Now we will have to massage the lists and the subject informations...
		for s in self.lists :
			content, heads = self.lists[s]
			
			# First the corresponding structure for the list head should change, turning it into the JSON-LD
			# abbreviation for lists
			lst = OrderedDict()
			lst["@list"] = [ self._object(c, None, None) for c in content ]
			self.all_subjects[s] = lst
			
			# The heads, ie, the list building blocks, should be removed from further processing
			for h in heads :
				self.all_subjects.pop(h, None)
				self.top_subjects.discard(h)

	def _get_node_ref(self, ident):
		"Returns the property name / local identifier for this node."
		if isinstance(ident, URIRef):
			if self.vocab and ident.startswith(self.vocab) :
				term = ident.replace(self.vocab,'',1)
				# Let us not create an empty term if the vocabulary URI is referred to anywhere...
				if term == "" : return ident
				self.vocabulary_terms[term] = ident
				return term
			else :
				return self.prefix_map.shrink(ident) or self.relativize(ident)
		elif isinstance(ident, BNode):
			return "_:%s" % ident
		raise TypeError('Expected URIRef or BNode instance, got %r' % ident)

	_predicate = _get_node_ref
	
	def _object(self, ident, parent, predicate):
		"Returns a json-serializable."
		if isinstance(ident, Literal):
			return self._encode_literal(ident)
		else :
			# This is a bnode or a URI ref. We have to see if it is a possible chain link
			if ident in self.chain_links :
				# Yep!
				# This object should be removed from the list of top level subjects
				self.top_subjects.discard(ident)
				# The parent must me removed from the chain_links, if there...
				if parent : self.chain_links.discard(parent)
				# The json structure of this object should be linked from the parent; the
				# content will be filled later
				jsubj = self.all_subjects[ident]
				# If this happens to be a BNode, this is the case when the BNode id is unnecessary
				if isinstance(ident, BNode) :
					jsubj.pop("@id",None)			
				return jsubj
			else :
				# no, this is either a leaf or a subject with multiple parents
				if predicate and predicate in self.uri_predicates :
					# The coercion rules will be added to the context for this predicate
					if isinstance(ident, BNode) :
						return "_:%s" % ident
					else :
						return self._get_node_ref(ident)
				else :
					retval = OrderedDict()
					if isinstance(ident, BNode) :
						retval["@id"] = "_:%s" % ident
					else :
						retval["@id"] = self._get_node_ref(ident)
					return retval
	
	def _encode_literal(self, literal):
		"""Produce either a plain string as a literal, or a literal with datatype or language"""
		if literal.datatype != None :
			retval = OrderedDict()
			if literal.datatype in 	JsonSerializer.automatic_datatypes :
				try :
					return literal.toPython()
				except :
					retval["@value"] = literal
					retval["@type"] = self._get_node_ref(literal.datatype)
			else :
				retval["@value"] = literal
				retval["@type"] = self._get_node_ref(literal.datatype)
			return retval
		elif literal.language != None and literal.language != "" :
			retval = OrderedDict()
			retval["@value"] = literal
			retval["@language"] = literal.language
			return retval
		else :
			return literal

	# This part is very specific to RDFa usage!
	def _rdfa_vocabulary_usage(self) :
		# See if a vocabulary has been used at all and, if yes, whether there is only one
		vocabs = [ (s,p,o) for s,p,o in self.graph.triples((None, RDFA_VOCAB, None)) ]
		# There may be a double management of vocab and namespace (schema.org is a typical case). The vocab approach
		# should prevail, otherwise problems occur in the @context of the generated JSON-LD with the keys
		for v in vocabs :
			self.prefix_map.remove_vocab(v[2])
			
		# if there is no vocab, or if there is more than one, there is nothing we can do to beautify
		# (in the multiple vocabulary case, there may be identical terms used in both vocabularies, and that can lead
		# to a mess)
		if len(vocabs) == 1 :
			self.vocab       = vocabs[0][2]
			self.vocab_owner = vocabs[0][0]
			
			# I had this code, part of a beautifying step that removed the RDFa triple on vocabularies if there was only
			# one. I think that is not kosher, so I decided to comment it out.
			#parents  = [ p for (p,x,y) in self.graph.triples((None, None, self.vocab_owner)) ]
			#children = [ p for (x,y,p) in self.graph.triples((self.vocab_owner, None, None)) ]
			## See if this is the only apperance of the vocab owner; if so, it should be removed from further processing
			#if len(parents) == 0 and len(children) == 1:
			#	# the subject should be removed
			#	self.top_subjects.discard(self.vocab_owner)

class PrefixMap(dict):
	"""A mapping of prefixes to URIs."""

	def __init__(self, parent=None, *args, **kwds):
		dict.__init__(self, *args, **kwds)
		self.parent     = parent and dict(parent) or None
		self.used_keys  = set()
		self.vocab_uris = set()
		
	def remove_vocab(self, uriref) :
		self.vocab_uris.add(uriref)
		
	def shrink(self, uriref):
		"Returns a CURIE or None."
		for pfx, ns in self.iteritems():
			if ns not in self.vocab_uris and uriref.startswith(ns):
				self.used_keys.add(pfx)
				return '%s:%s' % (pfx, uriref.replace(ns,'',1))
		if self.parent:
			for pfx, ns in self.parent.iteritems():
				if ns not in self.vocab_uris and uriref.startswith(ns):
					self[pfx] = ns
					self.used_keys.add(pfx)
					return '%s:%s' % (pfx, uriref.replace(ns,'',1))

	def resolve(self, curie):
		"Returns an URIRef or None."
		prefix, s, relative_ref = curie.partition(':')
		if prefix in self:
			return URIRef(self[prefix] + relative_ref)
		elif self.parent and prefix in self.parent:
			return URIRef(self.parent[prefix] + relative_ref)