File: cluster.py

package info (click to toggle)
pegasus-wms 4.4.0%2Bdfsg-4
  • links: PTS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 73,560 kB
  • ctags: 21,150
  • sloc: xml: 308,741; java: 101,268; python: 30,428; perl: 13,290; ansic: 7,830; sh: 6,843; cpp: 5,821; makefile: 803; sql: 431; php: 9
file content (53 lines) | stat: -rwxr-xr-x 2,247 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#!/usr/bin/env python

from Pegasus.DAX3 import ADAG, File, Link, Job, Executable, PFN, Profile
import sys
import os
import ConfigParser

if len(sys.argv) != 3:
    print "Usage: %s PEGASUS_HOME" % (sys.argv[0])
    sys.exit(1)

config = ConfigParser.ConfigParser({'input_file':'', 'workflow_name':'horizontal-clustering-test', 'executable_installed':"False", 'clusters_size':"3", 'clusters_maxruntime':"7"})
config.read(sys.argv[2] + '/test.config')

# Create an abstract dag
cluster = ADAG (config.get('all', 'workflow_name'))

input_file = config.get('all', 'input_file')
if (input_file == ''):
        input_file = os.getcwd ()
else:
        input_file += '/' + os.getenv ('USER') + '/inputs'

# Add input file to the DAX-level replica catalog
a = File("f.a")
a.addPFN(PFN(config.get('all', 'file_url') + input_file + "/f.a", config.get('all', 'file_site')))
cluster.addFile(a)

for i in range (1, 3):
    sleep = Executable (namespace = "cluster", name = "level" + str (i), version = "1.0", os = "linux", arch = "x86", installed=config.getboolean('all', 'executable_installed'))
    sleep.addPFN (PFN (config.get('all', 'executable_url') + sys.argv[1] + "/bin/pegasus-keg", config.get('all', 'executable_site')))
    sleep.addProfile (Profile (namespace = "pegasus", key = "clusters.size", value = config.get('all', 'clusters_size')))
    sleep.addProfile (Profile (namespace = "pegasus", key = "clusters.maxruntime", value = config.get('all', 'clusters_maxruntime')))
    cluster.addExecutable(sleep)

for i in range (4):
    job = Job (namespace = "cluster", name = "level1", version = "1.0")
    job.addArguments('-a level1 -T ' + str (i + 1))
    job.addArguments('-i', a)
    job.addProfile (Profile (namespace = "pegasus", key = "job.runtime", value = str (i + 1)))
    job.uses(a, link=Link.INPUT)
    cluster.addJob (job)

    for j in range (4):
        child = Job (namespace = "cluster", name = "level2", version = "1.0")
	child.addArguments('-a level2 -T ' + str ((j + 1) * 2))
        child.addProfile (Profile (namespace = "pegasus", key = "runtime", value = str ((j + 1) * 2)))
        cluster.addJob (child)

        cluster.depends (parent = job, child = child)

# Write the DAX to standard out
cluster.writeXML (sys.stdout)