1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
#############################################################
# #
# Author: Herve Menager #
# Organization:'Biological Software and Databases' Group, #
# Institut Pasteur, Paris. #
# Distributed under GPLv2 Licence. Please refer to the #
# COPYING.LIB document. #
# #
#############################################################
"""
Validator.py
This module is used to validate a Mobyle XML file (program or else?)
against the Relax NG schema and the schematron rules
- The Validator class
"""
from lxml import etree
import os
from subprocess import Popen, PIPE
from Mobyle.ConfigManager import Config
_cfg = Config()
from tempfile import NamedTemporaryFile
from Mobyle.Parser import parseService
from logging import getLogger
v_log = getLogger(__name__)
# defining paths for validation external stuff
mobschPath = os.path.join(_cfg.mobylehome(),'Schema','mobyle.sch')
schprocPath = os.path.join(_cfg.mobylehome(),'Tools','validation','iso_svrl.xsl')
rbPath = os.path.join(_cfg.mobylehome(),'Tools','validation','remove_base.xsl')
jingPath = os.path.join(_cfg.mobylehome(),'Tools','validation','jing','jing.jar')
SVRL_NS = "http://purl.oclc.org/dsdl/svrl"
svrl_validation_errors = etree.XPath(
'//svrl:failed-assert/svrl:text/text()', namespaces={'svrl': SVRL_NS})
# generating schematron validation xsl from its xsl generator
schVal_transform = etree.XSLT(etree.parse(schprocPath))
mobVal_transform = etree.XSLT(schVal_transform(etree.parse(mobschPath)))
rb_transform = etree.XSLT(etree.parse(rbPath))
net_enabled_parser = etree.XMLParser(no_network=False)
class Validator(object):
def __init__(self, type, docPath=None, docString=None, publicURI=None,runRNG=True, runSCH=True, runRNG_Jing=False):
assert (docPath is not None and docString is None) or (docPath is None and docString is not None), "Please specify either a path or a string. Your parameters:\n-path=%s\n-string=%s" %(docPath, docString)
types = ["program", "viewer", "workflow", "program_or_workflow", "tutorial"]
assert type in types, "Supported validator types are: %s - not %s" % (', '.join(types), type)
self.publicURI = publicURI
self.mobrngPath = os.path.join(_cfg.mobylehome(),'Schema','%s.rng' % type)
rng_doc = etree.parse(self.mobrngPath)
self.mobrngVal = etree.RelaxNG(rng_doc)
self.runRNG = runRNG
self.runRNG_Jing = runRNG_Jing
self.runSCH = runSCH
self.file_is_temporary = False
if docPath:
self.docPath = docPath
else:
# if we pass an XML string, create a temporary file which
# will be parsed and passed to different validation steps
tmpXmlFile = NamedTemporaryFile(delete=False)
tmpXmlFile.write(docString)
tmpXmlFile.close()
self.docPath = tmpXmlFile.name
# this flag tells the object that the file path is
# temporary and that it should be removed upon object destruction
self.file_is_temporary = True
self.doc = etree.parse(self.docPath, parser=net_enabled_parser)
self.doc.xinclude()
self.doc = rb_transform(self.doc)
def run(self):
if self.runRNG:
self.rngOk = self.mobrngVal.validate(self.doc)
self.rngErrors = [self.mobrngVal.error_log.last_error]
if self.runRNG_Jing:
try:
cp = '-cp %s' % jingPath
val_cmd = ("java -Dorg.apache.xerces.xni.parser.XMLParserConfiguration=org.apache.xerces.parsers.XIncludeParserConfiguration %s com.thaiopensource.relaxng.util.Driver %s %s" % (cp, self.mobrngPath, self.docPath))
val_cmd = val_cmd.split(' ')
process = Popen(val_cmd , shell = False , stdout = PIPE, stderr = PIPE )
messages = []
for line in process.stdout:
if line.find('found attribute "xml:base", but no attributes allowed here')==-1:
messages.append(line)
for line in process.stderr:
if line.find('Could not find or load main class com.thaiopensource.relaxng.util.Driver'):
# handle errors like "jing not found"
v_log.error('error during Jing validation: %s' % str(line))
break
if line.find('found attribute "xml:base", but no attributes allowed here')==-1:
messages.append(line)
process.wait()
#returnCode = process.poll()
#self.rng_JingOk = not(returnCode)
self.rng_JingOk = len(messages)==0
self.rng_JingErrors = messages
except OSError, ose:
# handle errors like "java not found"...
v_log.error('error during Jing validation: %s' % str(ose))
if self.runSCH:
topLevelParams={}
if self.publicURI:
topLevelParams={'fileNameParameter':os.path.basename(self.publicURI)}
self.eval_report = mobVal_transform(self.doc, **topLevelParams)
self.schErrors = [s.strip() for s in svrl_validation_errors(self.eval_report)]
self.schOk = len(self.schErrors)==0
self.valNotOk = (self.runRNG and not(self.rngOk)) or (self.runSCH and not(self.schOk))
self.valOk = not(self.valNotOk)
@property
def service_name(self):
return self.doc.xpath('/*/head/name/text()')[0]
@property
def service(self):
service = parseService(self.docPath)
service.simulation_path = self.docPath
service.pid = self.service_name
return service
def __del__(self):
# remove temporary path object removal
if self.file_is_temporary:
os.remove(self.docPath)
|