File: start_parallel_jobs.py

package info (click to toggle)
qiime 1.4.0-2
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 29,704 kB
  • sloc: python: 77,837; haskell: 379; sh: 113; makefile: 103
file content (124 lines) | stat: -rwxr-xr-x 4,576 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python
# File created on 10 Mar 2010
from __future__ import division

__author__ = "Greg Caporaso"
__copyright__ = "Copyright 2011, The QIIME Project"
__credits__ = ["Greg Caporaso"]
__license__ = "GPL"
__version__ = "1.4.0"
__maintainer__ = "Greg Caporaso"
__email__ = "gregcaporaso@gmail.com"
__status__ = "Release"
 

from qiime.util import make_option
from qiime.util import parse_command_line_parameters
from subprocess import Popen
from os import makedirs, chmod, getenv, remove
from os.path import exists
from shutil import rmtree
from stat import S_IRWXU
from qiime.util import load_qiime_config

qiime_config = load_qiime_config()


script_info = {}
script_info['brief_description'] = "Starts multiple jobs in parallel on multicore or multiprocessor systems."
script_info['script_description'] = "This script is designed to start multiple jobs in parallel on systems with no queueing system, for example a multiple processor or multiple core laptop/desktop machine. This also serves as an example 'cluster_jobs' which users can use a template to define scripts to start parallel jobs in their environment."
script_info['script_usage'] = [\
 ("Example",\
 "Start each command listed in test_jobs.txt in parallel. The run id for these jobs will be RUNID. ",\
 "%prog -ms test_jobs.txt RUNID")]
script_info['output_description']= "No output is created."
script_info['required_options'] = []
script_info['optional_options'] = [\
 make_option('-m','--make_jobs',action='store_true',\
         help='make the job files [default: %default]'),\
 make_option('-s','--submit_jobs',action='store_true',\
         help='submit the job files [default: %default]')
]
script_info['version'] = __version__
script_info['disallow_positional_arguments'] = False

def write_job_files(output_dir,commands,run_id):
    jobs_dir = '%s/jobs/' % output_dir
    job_fps = []
    if not exists(jobs_dir):
        try:
            makedirs(jobs_dir)
        except OSError,e:
            raise OSError, "Error creating jobs directory in working dir: %s"+\
            " (specified in qiime_config). Do you have write access?. "+\
            "Original error message follows:\n%s" % (output_dir,str(e))
        paths_to_remove = [jobs_dir]
    else:
        # point paths_to_remove at job_fps
        paths_to_remove = job_fps
    
    # This is messy right now as our clusters (bmf, bmf2) require us to
    # start and exit a shell for some reason which we haven't figured out. 
    # Running these commands as parallel shell scripts gets screwed up by 
    # this. For the time-being, I'm stripping this out here. Once the new 
    # clusters are up, I'm going to move the wrapping of commands in 
    # bash/exit to the cluster_jobs script. At that point this function
    # will be greatly simplified.
    ignored_subcommands = {}.fromkeys(['/bin/bash','exit'])
    
    for i,command in enumerate(commands):
        job_fp = '%s/%s%d' % (jobs_dir, run_id, i)
        f = open(job_fp,'w')
        f.write('\n'.join([subcommand \
         for subcommand in command.split(';') 
         if subcommand.strip() not in ignored_subcommands]))
        f.close()
        chmod(job_fp, S_IRWXU)
        job_fps.append(job_fp)
    
    return job_fps, paths_to_remove

def run_commands(output_dir,commands,run_id,submit_jobs,keep_temp):
    """
    """
    # Popen is not a big fan of how we join commands with semi-colons,
    # so each command is written to a shell script which is then called 
    # by Popen
    job_fps, paths_to_remove = write_job_files(output_dir,commands,run_id)
    
    # Call the jobs
    if submit_jobs:
        for job_fp in job_fps:
            Popen(['/bin/sh', job_fp])
    
    # clean up the shell scripts that were created
    if not keep_temp:
        for p in paths_to_remove:
            try:
                # p is file
                remove(p)
            except OSError:
                # p is directory
                rmtree(p)
    return

def main():
    option_parser, opts, args =\
       parse_command_line_parameters(**script_info)
       
    if opts.submit_jobs and not opts.make_jobs:
        option_parser.error('Must pass -m if passing -s. (Sorry about this, '+\
        'it\'s for backwards-compatibility.)') 

    min_args = 2
    if len(args) < min_args:
       option_parser.error('Exactly two arguments are required.')
       
    output_dir = qiime_config['working_dir'] or './'
    run_commands(output_dir,open(args[0]),args[1],\
     submit_jobs=opts.submit_jobs,\
     keep_temp=True)


if __name__ == "__main__":
    main()