File: BMPSWorkflow.py

package info (click to toggle)
mobyle 1.5.5%2Bdfsg-6
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 8,288 kB
  • sloc: python: 22,709; makefile: 35; sh: 33; ansic: 10; xml: 6
file content (541 lines) | stat: -rw-r--r-- 23,111 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
'''
Created on Dec 17, 2012

@author: hmenager
'''

from lxml import etree
import logging
import shutil
import urllib, os
import pygraphviz as pgv  # @UnresolvedImport
from glob import glob

from Workflow import Task, Parameter, Paragraph, Link, Type, \
    Datatype, Biotype, Value, VElem, Vlist, Parser
from Parser import parseService
from Mobyle.Registry import registry, WorkflowDef
from InterfacePreprocessor import InterfacePreprocessor
from MobyleError import MobyleError
log = logging.getLogger('Mobyle')

parser = etree.XMLParser(no_network=False)

class CustomResolver(etree.Resolver):
    """
    CustomResolver is a Resolver for lxml that allows (among other things) to
    handle HTTPS protocol, which is not handled natively by lxml/libxml2
    """
    def resolve(self, url, public_id, context):
        return self.resolve_file(urllib.urlopen(url), context)

parser = etree.XMLParser(no_network=False)
parser.resolvers.add(CustomResolver())

def xslProcess(xsl_file, xml_source, xml_target, params={}):
    global parser
    xslt_doc = etree.parse(xsl_file, parser)
    transform = etree.XSLT(xslt_doc)
    parser = etree.XMLParser(no_network=False)
    xml = etree.parse(xml_source, parser)
    xml = transform(xml, **params)
    if xml_target:
        target_file = open(xml_target, "w")
        target_file.write(str(xml))
        target_file.close()
    else:
        return str(xml)

class ServiceNotFoundError(MobyleError):
    def __init__(self, pid):
        self.pid = pid

    @property
    def message(self):
        return "Service %s cannot not be found on the server" % self.pid

class RenameError(MobyleError):
    """
    Raised when trying to rename a workflow to a name which already exists
    """
    def __init__(self, old_name, new_name):
        """
        old_name: old workflow name
        new_name: requested new workflow name
        """
        self.old_name = old_name
        self.new_name = new_name

    @property
    def message(self):
        return "Workflow %s cannot not be renamed to %s, which already exists" % (self.old_name, self.new_name)

class CopyError(MobyleError):
    """
    Raised when trying to rename a workflow to a name which already exists
    """
    def __init__(self, old_name):
        """
        old_name: old workflow name
        """
        self.old_name = old_name

    @property
    def message(self):
        return "Workflow %s cannot not be copy in to new Session, Workflow already exists" % (self.old_name)


def get_service(pid):
    def f(x): return x and x != '.'
    pid_dict = filter(f, pid.partition('.'))
    try:
        try:
            if len(pid_dict) == 1:
                service = registry.serversByName['local'].programsByName[pid_dict[0]]    
            else:
                service = registry.serversByName[pid_dict[0]].programsByName[pid_dict[1]]
        except KeyError:
            if len(pid_dict) == 1:
                service = registry.serversByName['local'].workflowsByName[pid_dict[0]]            
            else:
                service = registry.serversByName[pid_dict[0]].workflowsByName[pid_dict[1]]
    except KeyError:
        raise ServiceNotFoundError(pid)
    return service

class BMPSWorkflow(object):

    BMW_FOLDER = 'BMW'

    PID_PREFIX = 'my_workflow_'
    
    MOBYLEXML_SUFFIX = '_mobyle.xml'

    def __init__(self, name, session):
        """
        name -- name of the workflow
        session -- user session containing the workflow
        """
        self.name = name
        self.session = session
        if not(os.path.exists(self.graphml_filepath)):
            # create source workflow if it does not exist
            open(self.graphml_filepath, 'wb').write('<graphml><graph></graph></graphml>')
            self.graphml_to_mobylexml()

    @classmethod
    def get_from_mobylexml_filepath(cls, filepath, session):
        """
        factory to initialise this object from the path to the mobyle XML path
        """
        name = os.path.basename( filepath[:-11] )
        return BMPSWorkflow(name, session)

    @classmethod
    def get_from_pid(cls, pid, session):
        """
        factory to initialise this object from its pid
        """
        name = pid.replace(cls.PID_PREFIX,'')
        return BMPSWorkflow(name, session)
    
    @classmethod
    def get_user_workflows(cls, session):
        """
        factory to list the workflows of a user session
        """
        session_workflow_path_list = glob( os.path.join(session.getDir() , cls.BMW_FOLDER,'*'+cls.MOBYLEXML_SUFFIX ) )
        session_workflow_list = [ BMPSWorkflow.get_from_mobylexml_filepath(filepath, session) for filepath in session_workflow_path_list]
        return session_workflow_list

    @classmethod    
    def load_user_workflows( cls, session):
        """ 
        This method allows to add BMW-defined workflows to the registry, so that
        they can be run in Mobyle
        """
        user_workflows_list = cls.get_user_workflows(session)
        if registry.serversByName.has_key('local'):
            local_server = registry.serversByName['local']
            #c = "user defined"
            for w in user_workflows_list:
                wf = WorkflowDef( name = w.pid,
                                      url = w.url,
                                      path = w.mobylexml_filepath,
                                      server = local_server
                                    )
                registry.addWorkflow(wf)

    @property
    def workflows_folderpath(self):
        """
        folder of the user session containing all its workflows
        """
        return os.path.realpath(os.path.join(self.session.getDir(), self.BMW_FOLDER))

    @property
    def graphml_filename(self):
        """
        GRAPHML file name for this workflow
        """
        return "%s.graphml" % self.name

    @property
    def tasks_filename(self):
        """
        tasks file name for this workflow
        """
        return "%s.tasks.xml" % self.name

    @property
    def mobylexml_filename(self):
        """
        Mobyle XML file name for this workflow
        """
        return self.name + self.MOBYLEXML_SUFFIX

    @property
    def url(self):
        """
        Mobyle XML URL for this workflow
        """
        return "%s/%s/%s" % (self.session.url, self.BMW_FOLDER, self.mobylexml_filename)


    @property
    def graphml_filepath(self):
        """
        GRAPHML file path for this workflow
        """
        return os.path.realpath(os.path.join(self.workflows_folderpath, self.graphml_filename))

    @property
    def tasks_filepath(self):
        """
        tasks values file name for this workflow
        """
        return os.path.realpath(os.path.join(self.workflows_folderpath, self.tasks_filename))

    @property
    def mobylexml_filepath(self):
        """
        Mobyle XML file path for this workflow
        """
        return os.path.realpath(os.path.join(self.workflows_folderpath, self.mobylexml_filename))

    @property
    def pid(self):
        return self.PID_PREFIX + self.name
    
    def graphviz_layout(self):
        """
        Perform automatic layout using graphviz for this workflow
        """
        wf_dot_filepath = os.path.join(self.workflows_folderpath, self.name + '.dot')
        wf_css_filepath = os.path.join(self.workflows_folderpath, self.name + 'cmap.css')
        graphml_to_dot_xsl_path = 'graphml_to_dot.xsl';
        xslProcess(graphml_to_dot_xsl_path, self.graphml_filepath, wf_dot_filepath)
        G = pgv.AGraph(wf_dot_filepath)
        G.layout(prog='dot')
        G.draw(wf_css_filepath, format='cmapx')
        xslProcess("update_graphml_with_graphviz_layout.xsl", self.graphml_filepath, self.graphml_filepath, params={'cssPath':"'" + wf_css_filepath + "'"})
        return open(self.graphml_filepath, 'r').read()

    def update_graphml(self, graphml_contents):
        """
        Update the workflow using a new GRAPHML file
        """
        open(self.graphml_filepath, 'w').write(graphml_contents)
        self.graphml_to_mobylexml()
        
    def graphml_to_mobylexml(self):
        """
        Update the workflow files stored on the server from the corresponding up-to-date graphml file
        """
        graphml_to_wf_xsl_path = 'graphml_to_wf.xsl'
        xslProcess(graphml_to_wf_xsl_path, self.graphml_filepath, self.mobylexml_filepath)
        yy = open(self.mobylexml_filepath, 'r').read()
        mobyle_parser = Parser()
        o = mobyle_parser.XML(yy)
        if os.path.exists(self.tasks_filepath):
            doc = etree.parse(self.tasks_filepath, parser)
            root = doc.getroot()
            for wf_task_ele in o.findall("flow/task"):
                task_id = wf_task_ele.get('id')
                task_inp_values = root.find("task[@id='%s']" % task_id)
                if (task_inp_values is not None):
                    for inp in list(task_inp_values):
                        wf_task_ele.append(inp) 
        # generate input and output parameters for Mobyle integration
        mobyle_parser = Parser()
        wf = mobyle_parser.XML(mobyle_parser.tostring(o))
        # order tasks for parameter generation for the sake of usability
        unordered_tasks = wf.tasks
        ordered_tasks = []
        def order_next_tasks(task):
            if task is None:
                next_tasks = [next_task for next_task in unordered_tasks if next_task.id not in [link.to_task for link in wf.links]]
            else:
                next_tasks = [next_task for next_task in unordered_tasks if next_task.id in [link.to_task for link in wf.links if link.from_task==task.id]]
            for next_task in next_tasks:
                if next_task not in ordered_tasks:
                    ordered_tasks.append(next_task)
                    order_next_tasks(next_task)
        order_next_tasks(None)
        for task in ordered_tasks:
            service = parseService(registry.serversByName['local'].programsByName[task.service].path)
            # for each task create a paragraph to group its parameters
            paragraph = Paragraph()
            paragraph.name = task.id
            paragraph.prompt = "Task %s" % (task.description if task.description != '' else task.service)
            # create input parameters
            for input_parameter_name in service.getUserInputParameterByArgpos():
                # if the value of the parameter is not provided by a link
                if not([link for link in wf.links if (input_parameter_name == link.to_parameter and task.id == link.to_task)]):
                    input_parameter = service.getParameter(input_parameter_name)
                    parameter = Parameter()
                    # input parameter name is: task_id + "_" + parameter name to avoid collisions
                    parameter.name = task.id + "_" + input_parameter.getName()
                    parameter.prompt = input_parameter.getPrompt()
                    parameter.type = Type()
                    parameter.type.biotypes = [Biotype(bt_str) for bt_str in input_parameter.getBioTypes()]
                    parameter.type.datatype = Datatype()
                    parameter.type.datatype.class_name = input_parameter.getDataType().name
                    if input_parameter.getDataType().name != str(input_parameter.getDataType().getRealName()):
                        parameter.type.datatype.superclass_name = str(input_parameter.getDataType().getRealName())
                    parameter.id = parameter.name
                    if input_parameter.ismandatory() and (len(input_parameter.getPreconds()) == 0):
                        parameter.ismandatory = True
                    if input_parameter.issimple():
                        parameter.issimple = True                        
                    # setting default value (vdef)
                    vdefs = []
                    for iv in [iv for iv in task.input_values if iv.name == input_parameter_name]:
                        value = Value()
                        if iv.reference is not None:
                            value.reference = iv.reference
                            value.safe_name = iv.safe_name
                            value.user_name = iv.user_name
                        elif iv.value is not None:
                            value.value = iv.value
                        vdefs.append(value)
                    if not(vdefs) and input_parameter.getVdef():
                        vdeflist = input_parameter.getVdef()
                        if isinstance(vdeflist,basestring):
                            vdeflist = [vdeflist]
                        for vdef in vdeflist:
                            value = Value()
                            value.value = vdef
                            vdefs.append(value)
                    if vdefs:
                        parameter.vdef = vdefs
                    # setting vdef
                    if input_parameter.hasVlist():
                        vlist = Vlist()
                        for key, value in input_parameter._vlist.items():
                            velem = VElem()
                            velem.label = key
                            velem.value = value
                            vlist.velems = vlist.velems + [velem]
                        parameter.vlist = vlist
                    if input_parameter.hasFlist():
                        vlist = Vlist()
                        for key, value in input_parameter._flist.items():
                            velem = VElem()
                            velem.label = value[0]
                            velem.value = key
                            vlist.velems = vlist.velems + [velem]
                        parameter.vlist = vlist
                    # wf.parameters = wf.parameters + [parameter]
                    paragraph.parameters = paragraph.parameters + [parameter]
                    link = Link()
                    link.to_parameter = input_parameter_name
                    link.to_task = task.id
                    link.from_parameter = parameter.id
                    wf.links = wf.links + [link]
            if len(paragraph.parameters) > 0:
                wf.paragraphs = wf.paragraphs + [paragraph]
            # complete the description of workflow output parameters
            for output_parameter_name in service.getUserOutputParameters():
                # if the value of the parameter is not provided by a link
                for link in wf.links:
                    if (output_parameter_name == link.from_parameter and task.id == link.from_task and link.to_task is None):
                        output_parameter = service.getParameter(output_parameter_name)
                        wf_output_parameter = [parameter for parameter in wf.parameters if parameter.id == link.to_parameter][0]
                        wf_output_parameter.prompt = output_parameter.getPrompt()
                        wf_output_parameter.type = Type()
                        wf_output_parameter.type.biotypes = [Biotype(bt_str) for bt_str in output_parameter.getBioTypes()]
                        wf_output_parameter.type.datatype = Datatype()
                        wf_output_parameter.type.datatype.class_name = output_parameter.getDataType().name
                        if input_parameter.getDataType().name != str(output_parameter.getDataType().getRealName()):
                            wf_output_parameter.type.datatype.superclass_name = str(output_parameter.getDataType().getRealName())
        fh = open(self.mobylexml_filepath, 'w')
        fh.write(mobyle_parser.tostring(wf))
        fh.close()
        # now generating a Mobyle-style interface to enable display of BMW jobs in Mobyle Portal
        preprocessor = InterfacePreprocessor()
        preprocessor.process_interface(self.mobylexml_filepath)
        return
    
    def get_graphml(self):
        """
        Get the contents of the GRAPHML file for the workflow
        """
        return open(self.graphml_filepath, 'r').read()

    def rename_wf(self, new_name, new_description=None):
        """
        Rename the workflow
        """
        old_graphml_filepath = self.graphml_filepath
        old_mobylexml_filepath = self.mobylexml_filepath
        if os.path.exists(self.tasks_filepath):
            old_tasks_filepath = self.tasks_filepath
        else:
            old_tasks_filepath = None
        old_name = self.name
        self.name = new_name
        if(not(os.path.exists(self.graphml_filepath))):
            os.rename(old_graphml_filepath, self.graphml_filepath)
            os.rename(old_mobylexml_filepath, self.mobylexml_filepath)
            if old_tasks_filepath is not None:
                os.rename(old_tasks_filepath, self.tasks_filepath)
        else:
            raise RenameError(old_name, new_name)
        # rename in the graphml file directly as well
        source_file = open(self.graphml_filepath, 'r')
        root_tree = etree.parse(source_file)
        source_file.close()
        graph = root_tree.find('graph')
        graph.set('userName', self.name)
        if new_description is not None:
            graph.set('description', new_description)
        open(self.graphml_filepath, 'w').write(etree.tostring(root_tree))
        self.graphml_to_mobylexml()

    def change_wf_description(self, new_description=None):
        """
        Change the workflow description
        """
        source_file = open(self.graphml_filepath, 'r')
        root_tree = etree.parse(source_file)
        source_file.close()
        graph = root_tree.find('graph')
        if new_description is not None:
            graph.set('description', new_description)
        open(self.graphml_filepath, 'w').write(etree.tostring(root_tree))
        self.graphml_to_mobylexml()
        

    
    def copy_wf(self, dest_session ):
        """
        copy the workflow into another session
        @param dest_session: 
        @type dest_session: Session object
        @param new_description:
        @type new_description: string
        """
        src_graphml_filepath = self.graphml_filepath
        src_mobylexml_filepath = self.mobylexml_filepath
        src_tasks_filepath = self.tasks_filepath
        
        dest_wf_dir = os.path.join(dest_session.getDir(), self.BMW_FOLDER)
        if not os.path.exists(dest_wf_dir):
            #in old authenticated sessions the workflow directory doe not exists
            os.mkdir(dest_wf_dir, 0755)
            
        copy = False
        for ext in ("","_1","_2","_3","_4","_5","_6","_7","_8","_9","_10"):
            dest_graphml_filename = self.graphml_filename + ext
            dest_mobylexml_filename = self.mobylexml_filename + ext
            dest_tasks_filename = self.tasks_filename + ext
            dest_graphml_filepath = os.path.join( dest_wf_dir, dest_graphml_filename)
            dest_mobylexml_filepath = os.path.join( dest_wf_dir, dest_mobylexml_filename)
            dest_tasks_filepath = os.path.join( dest_wf_dir, dest_tasks_filename)
            if not os.path.exists(dest_graphml_filepath):
                shutil.copy(src_graphml_filepath, dest_graphml_filepath)
                shutil.copy(src_mobylexml_filepath, dest_mobylexml_filepath )
                if os.path.exists(self.tasks_filepath):
                    shutil.copy(src_tasks_filepath, dest_tasks_filepath)
                source_file = open(dest_graphml_filepath)
                root_tree = etree.parse(source_file)
                source_file.close()
                graph = root_tree.find('graph')
                graph.set('userName', self.name)
                open(dest_graphml_filepath, 'w').write(etree.tostring(root_tree))
                new_wf = BMPSWorkflow( self.name + ext , dest_session )
                new_wf.graphml_to_mobylexml()
                copy = True
                break
        if not copy:
            raise CopyError( self.name )
            
        
    def delete(self):
        """
        Delete the workflow
        """
        os.remove(self.graphml_filepath)
        os.remove(self.mobylexml_filepath)
        if os.path.exists(self.tasks_filepath):
            os.remove(self.tasks_filepath)
        return
        
    def set_task_values(self, task_id, values):
        """
        Change the parameter values for a task of the workflow
        """
        if os.path.exists(self.tasks_filepath):
            doc = etree.parse(self.tasks_filepath, parser)
            root = doc.getroot()
            task_inp_values = root.find("task[@id='%s']" % task_id)
            if (task_inp_values is not None):
                root.remove(task_inp_values)
        else:
            root = etree.Element("tasks")
        field_list = '<task id="%s">\n' % task_id
        field_names = values.keys()
        for field in field_names:
            if field == 'action' or field == 'form_submit' or field == 'app_id' or field == 'from_app'\
                                or field == 'wf_name' or '.srcFileName' in field or '.name' in field\
                                or '.mode' in field or '.srcUrl' in field:
                continue
            valuelist = values.getvalue(field)
            if isinstance(valuelist,basestring):
                valuelist = [valuelist]
            for vdef in valuelist:
                if vdef and len(vdef.strip()) > 0:
                    if '.ref' in field:
                        field = field.replace('.ref','')
                        field_list = field_list + '<inputValue name="%s" reference="%s" mode="%s" userName="%s" safeName="%s"/>\n' % \
                            (field, values.getvalue(field+'.srcUrl',''), values.getvalue(field+'.mode',''), values.getvalue(field+'.name',''), values.getvalue(field+'.ref',''))
                        field = field.replace(".ref", ".srcFileName")
                    else:
                        field_list = field_list + '<inputValue name="%s">%s</inputValue>\n' % (field, vdef)
        field_list = field_list + '</task>\n'
        task_node = etree.fromstring(field_list, parser=parser)
        root.append(task_node)
        open(self.tasks_filepath, 'wb').write(etree.tostring(root))
        return
    
    def get_task_xml(self, task_id, service_pid):
        """
        Get the Mobyle XML for a task of the workflow, after having modified the default values
        """
        mobyle_parser = Parser()
        wf = mobyle_parser.parse(self.mobylexml_filepath)
        tasks = [t for t in wf.tasks if t.id == task_id]
        if(len(tasks)==0):
            mobyle_parser = Parser()
            wf = mobyle_parser.parse(self.mobylexml_filepath)
            task = Task()
            task.service = service_pid
            task.id = task_id
            wf.tasks = wf.tasks + [task]
        else:
            task = tasks[0]
        service_pid = task.service
        service_path = get_service(service_pid).path
        msg = xslProcess('task_xml.xsl', service_path, None, {'task_id':"'%s'" % task_id, 'workflow_url':"'%s'" % self.mobylexml_filepath})
        return msg