File: queues.py

package info (click to toggle)
espresso 6.7-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 311,068 kB
  • sloc: f90: 447,429; ansic: 52,566; sh: 40,631; xml: 37,561; tcl: 20,077; lisp: 5,923; makefile: 4,503; python: 4,379; perl: 1,219; cpp: 761; fortran: 618; java: 568; awk: 128
file content (98 lines) | stat: -rw-r--r-- 3,871 bytes parent folder | download | duplicates (8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
'''
testcode.queues
---------------

Access to external queueing systems.

:copyright: (c) 2012 James Spencer.
:license: modified BSD; see LICENSE for more details.
'''

import os.path
import subprocess
import sys
import time

import testcode2.exceptions as exceptions

class ClusterQueueJob:
    '''Interface to external queueing system.

:param string submit_file: filename of submit script to be submitted to the
    queueing system.
:param string system: name of queueing system.  Currently only an interface to
    PBS is implemented.
'''
    def __init__(self, submit_file, system='PBS'):
        self.job_id = None
        self.submit_file = submit_file
        self.system = system
        if self.system == 'PBS':
            self.submit_cmd = 'qsub'
            self.queue_cmd = 'qstat'
            self.job_id_column = 0
            self.status_column = 4
            self.finished_status = 'C'
        else:
            err = 'Queueing system not implemented: %s' % self.system
            raise exceptions.RunError(err)
    def create_submit_file(self, pattern, string, template):
        '''Create a submit file.
        
Replace pattern in the template file with string and place the result in
self.submit_file.

:param string pattern: string in template to be replaced.
:param string string: string to replace pattern in template.
:param string template: filename of file containing the template submit script.
'''
        # get template
        if not os.path.exists(template):
            err = 'Submit file template does not exist: %s.' % (template,)
            raise exceptions.RunError(err)
        ftemplate = open(template)
        submit = ftemplate.read()
        ftemplate.close()
        # replace marker with our commands
        submit = submit.replace(pattern, string)
        # write to submit script
        fsubmit = open(self.submit_file, 'w')
        fsubmit.write(submit)
        fsubmit.close()
    def start_job(self):
        '''Submit job to cluster queue.'''
        submit_cmd = [self.submit_cmd, self.submit_file]
        try:
            submit_popen = subprocess.Popen(submit_cmd, stdout=subprocess.PIPE,
                                            stderr=subprocess.STDOUT)
            submit_popen.wait()
            self.job_id = submit_popen.communicate()[0].strip().decode('utf-8')
        except OSError:
            # 'odd' syntax so exceptions work with python 2.5 and python 2.6/3.
            err = 'Error submitting job: %s' % (sys.exc_info()[1],)
            raise exceptions.RunError(err)
    def wait(self):
        '''Returns when job has finished running on the cluster.'''
        running = True
        # Don't ask the queueing system for the job itself but rather parse the
        # output from all current jobs and look  gor the job in question. 
        # This works around the problem where the job_id is not a sufficient
        # handle to query the system directly (e.g. on the CMTH cluster).
        qstat_cmd = [self.queue_cmd]
        while running:
            time.sleep(15)
            qstat_popen = subprocess.Popen(qstat_cmd, stdout=subprocess.PIPE,
                                           stderr=subprocess.PIPE)
            qstat_popen.wait()
            if qstat_popen.returncode != 0:
                err = ('Error inspecting queue system: %s' %
                                                      qstat_popen.communicate())
                raise exceptions.RunError(err)
            qstat_out = qstat_popen.communicate()[0]
            # Assume job has finished unless it appears in the qstat output.
            running = False
            for line in qstat_out.splitlines():
                words = line.split()
                if words[self.job_id_column] == self.job_id:
                    running = words[self.status_column] != self.finished_status
                    break