1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
|
# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Validate VO Services."""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from ...extern import six
from ...extern.six.moves import map
# STDLIB
import multiprocessing
import os
import warnings
from collections import OrderedDict
# LOCAL
from .exceptions import (ValidationMultiprocessingError,
InvalidValidationAttribute)
from ..client import vos_catalog
from ..client.exceptions import VOSError
from ...io import votable
from ...io.votable.exceptions import E19
from ...io.votable.validator import html, result
from ...logger import log
from ...utils import data
from ...utils.exceptions import AstropyUserWarning
from ...utils.timer import timefunc
from ...utils.xml.unescaper import unescape_all
# Temporary solution until STScI VAO registry formally provides
# <testQuery> tags
from .tstquery import parse_cs
__all__ = ['check_conesearch_sites']
@timefunc(1)
def check_conesearch_sites(destdir=os.curdir, verbose=True, parallel=True,
url_list='default'):
"""Validate Cone Search Services.
.. note::
URLs are unescaped prior to validation.
Only check queries with ``<testQuery>`` parameters.
Does not perform meta-data and erroneous queries.
Parameters
----------
destdir : str, optional
Directory to store output files. Will be created if does
not exist. Existing files with these names will be deleted
or replaced:
* conesearch_good.json
* conesearch_warn.json
* conesearch_exception.json
* conesearch_error.json
verbose : bool, optional
Print extra info to log.
parallel : bool, optional
Enable multiprocessing.
url_list : list of string, optional
Only check these access URLs against
`astropy.vo.validator.Conf.conesearch_master_list` and ignore
the others, which will not appear in output files. By
default, check those in
`astropy.vo.validator.Conf.conesearch_urls`. If `None`, check
everything.
Raises
------
IOError
Invalid destination directory.
timeout
URL request timed out.
ValidationMultiprocessingError
Multiprocessing failed.
"""
from . import conf
if url_list == 'default':
url_list = conf.conesearch_urls
if (not isinstance(destdir, six.string_types) or len(destdir) == 0 or
os.path.exists(destdir) and not os.path.isdir(destdir)):
raise IOError('Invalid destination directory') # pragma: no cover
if not os.path.exists(destdir):
os.mkdir(destdir)
# Output dir created by votable.validator
out_dir = os.path.join(destdir, 'results')
if not os.path.exists(out_dir):
os.mkdir(out_dir)
# Output files
db_file = OrderedDict()
db_file['good'] = os.path.join(destdir, 'conesearch_good.json')
db_file['warn'] = os.path.join(destdir, 'conesearch_warn.json')
db_file['excp'] = os.path.join(destdir, 'conesearch_exception.json')
db_file['nerr'] = os.path.join(destdir, 'conesearch_error.json')
# JSON dictionaries for output files
js_tree = {}
for key in db_file:
js_tree[key] = vos_catalog.VOSDatabase.create_empty()
# Delete existing files, if any, to be on the safe side.
# Else can cause confusion if program exited prior to
# new files being written but old files are still there.
if os.path.exists(db_file[key]): # pragma: no cover
os.remove(db_file[key])
if verbose:
log.info('Existing file {0} deleted'.format(db_file[key]))
# Master VO database from registry. Silence all the warnings.
with warnings.catch_warnings():
warnings.simplefilter('ignore')
js_mstr = vos_catalog.VOSDatabase.from_registry(
conf.conesearch_master_list, encoding='binary',
show_progress=verbose)
# Validate only a subset of the services.
if url_list is not None:
# Make sure URL is unique and fixed.
url_list = set(map(unescape_all,
[cur_url.encode('utf-8') if isinstance(cur_url, str) else cur_url
for cur_url in url_list]))
uniq_rows = len(url_list)
url_list_processed = [] # To track if given URL is valid in registry
if verbose:
log.info('Only {0}/{1} site(s) are validated'.format(
uniq_rows, len(js_mstr)))
# Validate all services.
else:
uniq_rows = len(js_mstr)
key_lookup_by_url = {}
# Process each catalog in the registry.
for cur_key, cur_cat in js_mstr.get_catalogs():
cur_url = cur_cat['url']
# Skip if:
# a. not a Cone Search service
# b. not in given subset, if any
if ((cur_cat['capabilityClass'] != b'ConeSearch') or
(url_list is not None and cur_url not in url_list)):
continue
# Use testQuery to return non-empty VO table with max verbosity.
testquery_pars = parse_cs(cur_cat['resourceID'])
cs_pars_arr = ['='.join([key, testquery_pars[key]]).encode('utf-8')
for key in testquery_pars]
cs_pars_arr += [b'VERB=3']
# Track the service.
key_lookup_by_url[cur_url + b'&'.join(cs_pars_arr)] = cur_key
if url_list is not None:
url_list_processed.append(cur_url)
# Give warning if any of the user given subset is not in the registry.
if url_list is not None:
url_list_skipped = url_list - set(url_list_processed)
n_skipped = len(url_list_skipped)
if n_skipped > 0:
warn_str = '{0} not found in registry! Skipped:\n'.format(n_skipped)
for cur_url in url_list_skipped:
warn_str += '\t{0}\n'.format(cur_url)
warnings.warn(warn_str, AstropyUserWarning)
all_urls = list(key_lookup_by_url)
timeout = data.conf.remote_timeout
map_args = [(out_dir, url, timeout) for url in all_urls]
# Validate URLs
if parallel:
pool = multiprocessing.Pool()
try:
mp_list = pool.map(_do_validation, map_args)
except Exception as exc: # pragma: no cover
raise ValidationMultiprocessingError(
'An exception occurred during parallel processing '
'of validation results: {0}'.format(exc))
else:
mp_list = map(_do_validation, map_args)
# Categorize validation results
for r in mp_list:
db_key = r['out_db_name']
cat_key = key_lookup_by_url[r.url]
cur_cat = js_mstr.get_catalog(cat_key)
_copy_r_to_cat(r, cur_cat)
js_tree[db_key].add_catalog(cat_key, cur_cat)
# Write to HTML
html_subsets = result.get_result_subsets(mp_list, out_dir)
html.write_index(html_subsets, all_urls, out_dir)
if parallel:
html_subindex_args = [(out_dir, html_subset, uniq_rows)
for html_subset in html_subsets]
pool.map(_html_subindex, html_subindex_args)
else:
for html_subset in html_subsets:
_html_subindex((out_dir, html_subset, uniq_rows))
# Write to JSON
n = {}
n_tot = 0
for key in db_file:
n[key] = len(js_tree[key])
n_tot += n[key]
js_tree[key].to_json(db_file[key], overwrite=True)
if verbose:
log.info('{0}: {1} catalog(s)'.format(key, n[key]))
# Checksum
if verbose:
log.info('total: {0} out of {1} catalog(s)'.format(n_tot, uniq_rows))
if n['good'] == 0: # pragma: no cover
warnings.warn(
'No good sites available for Cone Search.', AstropyUserWarning)
def _do_validation(args):
"""Validation for multiprocessing support."""
root, url, timeout = args
votable.table.reset_vo_warnings()
r = result.Result(url, root=root, timeout=timeout)
r.validate_vo()
_categorize_result(r)
# This was already checked above.
# Calling this again to get VOTableFile object to catch
# well-formed error responses in downloaded XML.
#
# 'incorrect' is also added in case user wants to use
# 'conesearch_warn.json' anyway.
#
# If using cached data, it will not detect network error
# like the first run, but will raise exception.
#
# When SR is not 0, VOSError is raised for empty table.
#
if r['expected'] in ('good', 'incorrect') and r['nexceptions'] == 0:
nexceptions = 0
nwarnings = 0
lines = []
with warnings.catch_warnings(record=True) as warning_lines:
try:
tab = vos_catalog.vo_tab_parse(votable.table.parse(
r.get_vo_xml_path(), pedantic=False), r.url, {})
except (E19, IndexError, VOSError) as e: # pragma: no cover
lines.append(str(e))
nexceptions += 1
lines = [str(x.message) for x in warning_lines] + lines
warning_types = set()
for line in lines: # pragma: no cover
w = votable.exceptions.parse_vowarning(line)
if w['is_warning']:
nwarnings += 1
if w['is_exception']:
nexceptions += 1
warning_types.add(w['warning'])
r['nwarnings'] += nwarnings
r['nexceptions'] += nexceptions
r['warnings'] += lines
r['warning_types'] = r['warning_types'].union(warning_types)
_categorize_result(r)
html.write_result(r)
return r
def _categorize_result(r):
"""Set success codes.
Parameters
----------
r : `astropy.io.votable.validator.result.Result`
Raises
------
InvalidValidationAttribute
Unhandled validation result attributes.
"""
from . import conf
if 'network_error' in r and r['network_error'] is not None: # pragma: no cover
r['out_db_name'] = 'nerr'
r['expected'] = 'broken'
elif ((r['nexceptions'] == 0 and r['nwarnings'] == 0) or
r['warning_types'].issubset(conf.noncritical_warnings)):
r['out_db_name'] = 'good'
r['expected'] = 'good'
elif r['nexceptions'] > 0: # pragma: no cover
r['out_db_name'] = 'excp'
r['expected'] = 'incorrect'
elif r['nwarnings'] > 0: # pragma: no cover
r['out_db_name'] = 'warn'
r['expected'] = 'incorrect'
else: # pragma: no cover
raise InvalidValidationAttribute(
'Unhandled validation result attributes: {0}'.format(r._attributes))
def _html_subindex(args):
"""HTML writer for multiprocessing support."""
out_dir, subset, total = args
html.write_index_table(out_dir, *subset, total=total)
def _copy_r_to_cat(r, cat):
"""Copy validation result attributes to given VO catalog.
Parameters
----------
r : `astropy.io.votable.validate.result.Result`
cat : `astropy.vo.client.vos_catalog.VOSCatalog`
"""
for key in r._attributes:
new_key = 'validate_' + key
cat[new_key] = r._attributes[key]
|