1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
|
"""
Context Resolver for managing remote contexts.
.. module:: context_resolver
:synopsis: Creates a ContextResolver
.. moduleauthor:: Dave Longley
.. moduleauthor:: Gregg Kellogg <gregg@greggkellogg.net>
"""
from frozendict import frozendict
from c14n.Canonicalize import canonicalize
from pyld import jsonld
from .resolved_context import ResolvedContext
MAX_CONTEXT_URLS = 10
class ContextResolver:
"""
Resolves and caches remote contexts.
"""
def __init__(self, shared_cache, document_loader):
"""
Creates a ContextResolver.
"""
# processor-specific RDF parsers
self.per_op_cache = {}
self.shared_cache = shared_cache
self.document_loader = document_loader
def resolve(self, active_ctx, context, base, cycles=None):
"""
Resolve a context.
:param active_ctx: the current active context.
:param context: the context to resolve.
:param base: the absolute URL to use for making url absolute.
:param cycles: the maximum number of times to recusively fetch contexts.
(default MAX_CONTEXT_URLS).
"""
if cycles is None:
cycles = set()
# process `@context`
if (isinstance(context, dict) or isinstance(context, frozendict)) and '@context' in context:
context = context['@context']
# context is one or more contexts
if not isinstance(context, list):
context = [context]
# resolve each context in the array
all_resolved = []
for ctx in context:
if isinstance(ctx, str):
resolved = self._get(ctx)
if not resolved:
resolved = self._resolve_remote_context(
active_ctx, ctx, base, cycles)
# add to output and continue
if isinstance(resolved, list):
all_resolved.extend(resolved)
else:
all_resolved.append(resolved)
elif ctx is None or ctx is False:
all_resolved.append(ResolvedContext(False))
elif not isinstance(ctx, dict) and not isinstance(ctx, frozendict):
raise jsonld.JsonLdError(
'Invalid JSON-LD syntax; @context must be an object.',
'jsonld.SyntaxError', {'context': ctx},
code='invalid local context')
else:
# context is an object, get/create `ResolvedContext` for it
key = canonicalize(dict(ctx)).decode('UTF-8')
resolved = self._get(key)
if not resolved:
# create a new static `ResolvedContext` and cache it
resolved = ResolvedContext(ctx)
self._cache_resolved_context(key, resolved, 'static')
all_resolved.append(resolved)
return all_resolved
def _get(self, key):
resolved = self.per_op_cache.get(key)
if not resolved:
tag_map = self.shared_cache.get(key)
if tag_map:
resolved = tag_map.get('static')
if resolved:
self.per_op_cache[key] = resolved
return resolved
def _cache_resolved_context(self, key, resolved, tag):
self.per_op_cache[key] = resolved
if tag:
tag_map = self.shared_cache.get(key)
if not tag_map:
tag_map = {}
self.shared_cache[key] = tag_map
tag_map[tag] = resolved
return resolved
def _resolve_remote_context(self, active_ctx, url, base, cycles):
# resolve relative URL and fetch context
url = jsonld.prepend_base(base, url)
context, remote_doc = self._fetch_context(active_ctx, url, cycles)
# update base according to remote document and resolve any relative URLs
base = remote_doc.get('documentUrl', url)
self._resolve_context_urls(context, base)
# resolve, cache, and return context
resolved = self.resolve(active_ctx, context, base, cycles)
self._cache_resolved_context(url, resolved, remote_doc.get('tag'))
return resolved
def _fetch_context(self, active_ctx, url, cycles):
# check for max context URLs fetched during a resolve operation
if len(cycles) > MAX_CONTEXT_URLS:
raise jsonld.JsonLdError(
'Maximum number of @context URLs exceeded.',
'jsonld.ContextUrlError', {'max': MAX_CONTEXT_URLS},
code=('loading remote context failed'
if active_ctx.get('processingMode') == 'json-ld-1.0'
else 'context overflow'))
# check for context URL cycle
# shortcut to avoid extra work that would eventually hit the max above
if url in cycles:
raise jsonld.JsonLdError(
'Cyclical @context URLs detected.',
'jsonld.ContextUrlError', {'url': url},
code=('recursive context inclusion'
if active_ctx.get('processingMode') == 'json-ld-1.0'
else 'context overflow'))
# track cycles
cycles.add(url)
try:
remote_doc = jsonld.load_document(url,
{'documentLoader': self.document_loader},
requestProfile='http://www.w3.org/ns/json-ld#context')
context = remote_doc.get('document', url)
except Exception as cause:
raise jsonld.JsonLdError(
'Dereferencing a URL did not result in a valid JSON-LD object. ' +
'Possible causes are an inaccessible URL perhaps due to ' +
'a same-origin policy (ensure the server uses CORS if you are ' +
'using client-side JavaScript), too many redirects, a ' +
'non-JSON response, or more than one HTTP Link Header was ' +
'provided for a remote context.',
'jsonld.InvalidUrl',
{'url': url, 'cause': cause},
code='loading remote context failed')
# ensure ctx is an object
if not isinstance(context, dict) and not isinstance(context, frozendict):
raise jsonld.JsonLdError(
'Dereferencing a URL did not result in a JSON object. The ' +
'response was valid JSON, but it was not a JSON object.',
'jsonld.InvalidUrl',
{'url': url},
code='invalid remote context')
# use empty context if no @context key is present
if '@context' not in context:
context = {'@context': {}}
else:
context = {'@context': context['@context']}
# append @context URL to context if given
if remote_doc['contextUrl']:
if not isinstance(context['@context'], list):
context['@context'] = [context['@context']]
context['@context'].append(remote_doc['contextUrl'])
return (context, remote_doc)
def _resolve_context_urls(self, context, base):
"""
Resolve all relative `@context` URLs in the given context by inline
replacing them with absolute URLs.
:param context: the context.
:param base: the base IRI to use to resolve relative IRIs.
"""
if not isinstance(context, dict) and not isinstance(context, frozendict):
return
ctx = context.get('@context')
if isinstance(ctx, str):
context['@context'] = jsonld.prepend_base(base, ctx)
return
if isinstance(ctx, list):
for num, element in enumerate(ctx):
if isinstance(element, str):
ctx[num] = jsonld.prepend_base(base, element)
elif isinstance(element, dict) or isinstance(element, frozendict):
self. _resolve_context_urls({'@context': element}, base)
return
if not isinstance(ctx, dict) and not isinstance(ctx, frozendict):
# no @context URLs can be found in non-object
return
# ctx is an object, resolve any context URLs in terms
# (Iterate using keys() as items() returns a copy we can't modify)
for _, definition in ctx.items():
self._resolve_context_urls(definition, base)
|