File: validate.py

package info (click to toggle)
python-astropy 1.3-8~bpo8%2B2
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 44,292 kB
  • sloc: ansic: 160,360; python: 137,322; sh: 11,493; lex: 7,638; yacc: 4,956; xml: 1,796; makefile: 474; cpp: 364
file content (343 lines) | stat: -rw-r--r-- 11,192 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Validate VO Services."""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
from ...extern import six
from ...extern.six.moves import map

# STDLIB
import multiprocessing
import os
import warnings
from collections import OrderedDict

# LOCAL
from .exceptions import (ValidationMultiprocessingError,
                         InvalidValidationAttribute)
from ..client import vos_catalog
from ..client.exceptions import VOSError
from ...io import votable
from ...io.votable.exceptions import E19
from ...io.votable.validator import html, result
from ...logger import log
from ...utils import data
from ...utils.exceptions import AstropyUserWarning
from ...utils.timer import timefunc
from ...utils.xml.unescaper import unescape_all

# Temporary solution until STScI VAO registry formally provides
# <testQuery> tags
from .tstquery import parse_cs


__all__ = ['check_conesearch_sites']


@timefunc(1)
def check_conesearch_sites(destdir=os.curdir, verbose=True, parallel=True,
                           url_list='default'):
    """Validate Cone Search Services.

    .. note::

        URLs are unescaped prior to validation.

        Only check queries with ``<testQuery>`` parameters.
        Does not perform meta-data and erroneous queries.

    Parameters
    ----------
    destdir : str, optional
        Directory to store output files. Will be created if does
        not exist. Existing files with these names will be deleted
        or replaced:

            * conesearch_good.json
            * conesearch_warn.json
            * conesearch_exception.json
            * conesearch_error.json

    verbose : bool, optional
        Print extra info to log.

    parallel : bool, optional
        Enable multiprocessing.

    url_list : list of string, optional
        Only check these access URLs against
        `astropy.vo.validator.Conf.conesearch_master_list` and ignore
        the others, which will not appear in output files.  By
        default, check those in
        `astropy.vo.validator.Conf.conesearch_urls`.  If `None`, check
        everything.

    Raises
    ------
    IOError
        Invalid destination directory.

    timeout
        URL request timed out.

    ValidationMultiprocessingError
        Multiprocessing failed.

    """
    from . import conf

    if url_list == 'default':
        url_list = conf.conesearch_urls

    if (not isinstance(destdir, six.string_types) or len(destdir) == 0 or
            os.path.exists(destdir) and not os.path.isdir(destdir)):
        raise IOError('Invalid destination directory')  # pragma: no cover

    if not os.path.exists(destdir):
        os.mkdir(destdir)

    # Output dir created by votable.validator
    out_dir = os.path.join(destdir, 'results')

    if not os.path.exists(out_dir):
        os.mkdir(out_dir)

    # Output files
    db_file = OrderedDict()
    db_file['good'] = os.path.join(destdir, 'conesearch_good.json')
    db_file['warn'] = os.path.join(destdir, 'conesearch_warn.json')
    db_file['excp'] = os.path.join(destdir, 'conesearch_exception.json')
    db_file['nerr'] = os.path.join(destdir, 'conesearch_error.json')

    # JSON dictionaries for output files
    js_tree = {}
    for key in db_file:
        js_tree[key] = vos_catalog.VOSDatabase.create_empty()

        # Delete existing files, if any, to be on the safe side.
        # Else can cause confusion if program exited prior to
        # new files being written but old files are still there.
        if os.path.exists(db_file[key]):  # pragma: no cover
            os.remove(db_file[key])
            if verbose:
                log.info('Existing file {0} deleted'.format(db_file[key]))

    # Master VO database from registry. Silence all the warnings.
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        js_mstr = vos_catalog.VOSDatabase.from_registry(
            conf.conesearch_master_list, encoding='binary',
            show_progress=verbose)

    # Validate only a subset of the services.
    if url_list is not None:
        # Make sure URL is unique and fixed.
        url_list = set(map(unescape_all,
            [cur_url.encode('utf-8') if isinstance(cur_url, str) else cur_url
             for cur_url in url_list]))
        uniq_rows = len(url_list)
        url_list_processed = []  # To track if given URL is valid in registry
        if verbose:
            log.info('Only {0}/{1} site(s) are validated'.format(
                uniq_rows, len(js_mstr)))
    # Validate all services.
    else:
        uniq_rows = len(js_mstr)

    key_lookup_by_url = {}

    # Process each catalog in the registry.
    for cur_key, cur_cat in js_mstr.get_catalogs():
        cur_url = cur_cat['url']

        # Skip if:
        #   a. not a Cone Search service
        #   b. not in given subset, if any
        if ((cur_cat['capabilityClass'] != b'ConeSearch') or
                (url_list is not None and cur_url not in url_list)):
            continue

        # Use testQuery to return non-empty VO table with max verbosity.
        testquery_pars = parse_cs(cur_cat['resourceID'])
        cs_pars_arr = ['='.join([key, testquery_pars[key]]).encode('utf-8')
                       for key in testquery_pars]
        cs_pars_arr += [b'VERB=3']

        # Track the service.
        key_lookup_by_url[cur_url + b'&'.join(cs_pars_arr)] = cur_key
        if url_list is not None:
            url_list_processed.append(cur_url)

    # Give warning if any of the user given subset is not in the registry.
    if url_list is not None:
        url_list_skipped = url_list - set(url_list_processed)
        n_skipped = len(url_list_skipped)
        if n_skipped > 0:
            warn_str = '{0} not found in registry! Skipped:\n'.format(n_skipped)
            for cur_url in url_list_skipped:
                warn_str += '\t{0}\n'.format(cur_url)
            warnings.warn(warn_str, AstropyUserWarning)

    all_urls = list(key_lookup_by_url)
    timeout = data.conf.remote_timeout
    map_args = [(out_dir, url, timeout) for url in all_urls]

    # Validate URLs
    if parallel:
        pool = multiprocessing.Pool()
        try:
            mp_list = pool.map(_do_validation, map_args)
        except Exception as exc:  # pragma: no cover
            raise ValidationMultiprocessingError(
                'An exception occurred during parallel processing '
                'of validation results: {0}'.format(exc))
    else:
        mp_list = map(_do_validation, map_args)

    # Categorize validation results
    for r in mp_list:
        db_key = r['out_db_name']
        cat_key = key_lookup_by_url[r.url]
        cur_cat = js_mstr.get_catalog(cat_key)
        _copy_r_to_cat(r, cur_cat)
        js_tree[db_key].add_catalog(cat_key, cur_cat)

    # Write to HTML
    html_subsets = result.get_result_subsets(mp_list, out_dir)
    html.write_index(html_subsets, all_urls, out_dir)
    if parallel:
        html_subindex_args = [(out_dir, html_subset, uniq_rows)
                              for html_subset in html_subsets]
        pool.map(_html_subindex, html_subindex_args)
    else:
        for html_subset in html_subsets:
            _html_subindex((out_dir, html_subset, uniq_rows))

    # Write to JSON
    n = {}
    n_tot = 0
    for key in db_file:
        n[key] = len(js_tree[key])
        n_tot += n[key]
        js_tree[key].to_json(db_file[key], overwrite=True)
        if verbose:
            log.info('{0}: {1} catalog(s)'.format(key, n[key]))

    # Checksum
    if verbose:
        log.info('total: {0} out of {1} catalog(s)'.format(n_tot, uniq_rows))

    if n['good'] == 0:  # pragma: no cover
        warnings.warn(
            'No good sites available for Cone Search.', AstropyUserWarning)


def _do_validation(args):
    """Validation for multiprocessing support."""

    root, url, timeout = args

    votable.table.reset_vo_warnings()

    r = result.Result(url, root=root, timeout=timeout)
    r.validate_vo()

    _categorize_result(r)

    # This was already checked above.
    # Calling this again to get VOTableFile object to catch
    # well-formed error responses in downloaded XML.
    #
    # 'incorrect' is also added in case user wants to use
    # 'conesearch_warn.json' anyway.
    #
    # If using cached data, it will not detect network error
    # like the first run, but will raise exception.
    #
    # When SR is not 0, VOSError is raised for empty table.
    #
    if r['expected'] in ('good', 'incorrect') and r['nexceptions'] == 0:
        nexceptions = 0
        nwarnings = 0
        lines = []

        with warnings.catch_warnings(record=True) as warning_lines:
            try:
                tab = vos_catalog.vo_tab_parse(votable.table.parse(
                    r.get_vo_xml_path(), pedantic=False), r.url, {})
            except (E19, IndexError, VOSError) as e:  # pragma: no cover
                lines.append(str(e))
                nexceptions += 1
        lines = [str(x.message) for x in warning_lines] + lines

        warning_types = set()
        for line in lines:  # pragma: no cover
            w = votable.exceptions.parse_vowarning(line)
            if w['is_warning']:
                nwarnings += 1
            if w['is_exception']:
                nexceptions += 1
            warning_types.add(w['warning'])

        r['nwarnings'] += nwarnings
        r['nexceptions'] += nexceptions
        r['warnings'] += lines
        r['warning_types'] = r['warning_types'].union(warning_types)

        _categorize_result(r)

    html.write_result(r)
    return r


def _categorize_result(r):
    """Set success codes.

    Parameters
    ----------
    r : `astropy.io.votable.validator.result.Result`

    Raises
    ------
    InvalidValidationAttribute
        Unhandled validation result attributes.

    """
    from . import conf

    if 'network_error' in r and r['network_error'] is not None:  # pragma: no cover
        r['out_db_name'] = 'nerr'
        r['expected'] = 'broken'
    elif ((r['nexceptions'] == 0 and r['nwarnings'] == 0) or
            r['warning_types'].issubset(conf.noncritical_warnings)):
        r['out_db_name'] = 'good'
        r['expected'] = 'good'
    elif r['nexceptions'] > 0:  # pragma: no cover
        r['out_db_name'] = 'excp'
        r['expected'] = 'incorrect'
    elif r['nwarnings'] > 0:  # pragma: no cover
        r['out_db_name'] = 'warn'
        r['expected'] = 'incorrect'
    else:  # pragma: no cover
        raise InvalidValidationAttribute(
            'Unhandled validation result attributes: {0}'.format(r._attributes))


def _html_subindex(args):
    """HTML writer for multiprocessing support."""
    out_dir, subset, total = args
    html.write_index_table(out_dir, *subset, total=total)


def _copy_r_to_cat(r, cat):
    """Copy validation result attributes to given VO catalog.

    Parameters
    ----------
    r : `astropy.io.votable.validate.result.Result`

    cat : `astropy.vo.client.vos_catalog.VOSCatalog`

    """
    for key in r._attributes:
        new_key = 'validate_' + key
        cat[new_key] = r._attributes[key]