File: Validator.py

package info (click to toggle)
mobyle 1.5.5%2Bdfsg-6
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 8,288 kB
  • sloc: python: 22,709; makefile: 35; sh: 33; ansic: 10; xml: 6
file content (135 lines) | stat: -rw-r--r-- 6,111 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#############################################################
#                                                           #
#   Author: Herve Menager                                   #
#   Organization:'Biological Software and Databases' Group, #
#                Institut Pasteur, Paris.                   #
#   Distributed under GPLv2 Licence. Please refer to the    #
#   COPYING.LIB document.                                   #
#                                                           #
#############################################################
"""
Validator.py

This module is used to validate a Mobyle XML file (program or else?)
against the Relax NG schema and the schematron rules
- The Validator class
"""
from lxml import etree
import os
from subprocess import Popen, PIPE
from Mobyle.ConfigManager import Config
_cfg = Config()
from tempfile import NamedTemporaryFile

from  Mobyle.Parser import parseService

from logging import getLogger
v_log = getLogger(__name__)

# defining paths for validation external stuff
mobschPath = os.path.join(_cfg.mobylehome(),'Schema','mobyle.sch')
schprocPath = os.path.join(_cfg.mobylehome(),'Tools','validation','iso_svrl.xsl')
rbPath = os.path.join(_cfg.mobylehome(),'Tools','validation','remove_base.xsl')
jingPath = os.path.join(_cfg.mobylehome(),'Tools','validation','jing','jing.jar')

SVRL_NS = "http://purl.oclc.org/dsdl/svrl"
svrl_validation_errors = etree.XPath(
    '//svrl:failed-assert/svrl:text/text()', namespaces={'svrl': SVRL_NS})

# generating schematron validation xsl from its xsl generator
schVal_transform = etree.XSLT(etree.parse(schprocPath))
mobVal_transform = etree.XSLT(schVal_transform(etree.parse(mobschPath)))


rb_transform = etree.XSLT(etree.parse(rbPath))

net_enabled_parser = etree.XMLParser(no_network=False)

class Validator(object):
    
    def __init__(self, type, docPath=None, docString=None, publicURI=None,runRNG=True, runSCH=True, runRNG_Jing=False):
        assert (docPath is not None and docString is None) or (docPath is None and docString is not None), "Please specify either a path or a string. Your parameters:\n-path=%s\n-string=%s" %(docPath, docString)
        types = ["program", "viewer", "workflow", "program_or_workflow", "tutorial"]
        assert type in types, "Supported validator types are: %s - not %s" % (', '.join(types), type)

        self.publicURI = publicURI
        self.mobrngPath = os.path.join(_cfg.mobylehome(),'Schema','%s.rng' % type)
        rng_doc = etree.parse(self.mobrngPath)
        self.mobrngVal = etree.RelaxNG(rng_doc)
        
        self.runRNG = runRNG
        self.runRNG_Jing = runRNG_Jing
        self.runSCH = runSCH
        
        self.file_is_temporary = False
        if docPath:
            self.docPath = docPath
        else:
            # if we pass an XML string, create a temporary file which
            # will be parsed and passed to different validation steps
            tmpXmlFile = NamedTemporaryFile(delete=False)
            tmpXmlFile.write(docString)
            tmpXmlFile.close()
            self.docPath = tmpXmlFile.name
            # this flag tells the object that the file path is 
            # temporary and that it should be removed upon object destruction
            self.file_is_temporary = True
        self.doc = etree.parse(self.docPath, parser=net_enabled_parser)
        self.doc.xinclude()
        self.doc = rb_transform(self.doc)

    def run(self):
        if self.runRNG:
            self.rngOk = self.mobrngVal.validate(self.doc)
            self.rngErrors = [self.mobrngVal.error_log.last_error]
        if self.runRNG_Jing:
            try:
                cp = '-cp %s' % jingPath
                val_cmd = ("java -Dorg.apache.xerces.xni.parser.XMLParserConfiguration=org.apache.xerces.parsers.XIncludeParserConfiguration %s com.thaiopensource.relaxng.util.Driver %s %s" % (cp, self.mobrngPath, self.docPath))
                val_cmd = val_cmd.split(' ')
                process = Popen(val_cmd , shell = False , stdout = PIPE, stderr = PIPE )
                messages = []
                for line in process.stdout:
                    if line.find('found attribute "xml:base", but no attributes allowed here')==-1:
                        messages.append(line)
                for line in process.stderr:
                    if line.find('Could not find or load main class com.thaiopensource.relaxng.util.Driver'):
                        # handle errors like "jing not found"
                        v_log.error('error during Jing validation: %s' % str(line))
                        break
                    if line.find('found attribute "xml:base", but no attributes allowed here')==-1:
                        messages.append(line)
                process.wait()
                #returnCode = process.poll()
                #self.rng_JingOk = not(returnCode)
                self.rng_JingOk = len(messages)==0
                self.rng_JingErrors = messages
            except OSError, ose:
                # handle errors like "java not found"...
                v_log.error('error during Jing validation: %s' % str(ose))
        if self.runSCH:
            topLevelParams={}
            if self.publicURI:
                topLevelParams={'fileNameParameter':os.path.basename(self.publicURI)}
            self.eval_report = mobVal_transform(self.doc, **topLevelParams)
            self.schErrors = [s.strip() for s in svrl_validation_errors(self.eval_report)]
            self.schOk = len(self.schErrors)==0
            
        self.valNotOk = (self.runRNG and not(self.rngOk)) or (self.runSCH and not(self.schOk))
        self.valOk = not(self.valNotOk)
        
    @property
    def service_name(self):
        return self.doc.xpath('/*/head/name/text()')[0]

    @property
    def service(self):
        service = parseService(self.docPath)
        service.simulation_path = self.docPath
        service.pid = self.service_name
        return service
    
    def __del__(self):
        # remove temporary path object removal
        if self.file_is_temporary:
            os.remove(self.docPath)