File: pbs_queue_run.py

package info (click to toggle)
python-ase 3.22.1-3
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 14,344 kB
  • sloc: python: 126,379; xml: 946; makefile: 111; javascript: 47
file content (115 lines) | stat: -rw-r--r-- 4,857 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
""" Class for handling interaction with the PBS queuing system."""
from ase.io import write
import os
from ase.io.trajectory import Trajectory
from subprocess import Popen, PIPE
import time


class PBSQueueRun:

    """ Class for communicating with the commonly used PBS queing system
         at a computer cluster.

        The user needs to supply a job file generator which takes
        as input a job name and the relative path to the traj
        file which is to be locally optimized. The function returns
        the job script as text.
        If the traj file is called f the job must write a file
        f[:-5] + '_done.traj' which is then read by this object.

       Parameters:

       data_connection: The DataConnection object.
       tmp_folder: Temporary folder for all calculations
       job_prefix: Prefix of the job submitted. This identifier is used
       to determine how many jobs are currently running.
       n_simul: The number of simultaneous jobs to keep in the queuing system.
       job_template_generator: The function generating the job file.
       This function should return the content of the job file as a
       string.
       qsub_command: The name of the qsub command (default qsub).
       qstat_command: The name of the qstat command (default qstat).
    """
    def __init__(self, data_connection, tmp_folder, job_prefix,
                 n_simul, job_template_generator,
                 qsub_command='qsub', qstat_command='qstat',
                 find_neighbors=None, perform_parametrization=None):
        self.dc = data_connection
        self.job_prefix = job_prefix
        self.n_simul = n_simul
        self.job_template_generator = job_template_generator
        self.qsub_command = qsub_command
        self.qstat_command = qstat_command
        self.tmp_folder = tmp_folder
        self.find_neighbors = find_neighbors
        self.perform_parametrization = perform_parametrization
        self.__cleanup__()

    def relax(self, a):
        """ Add a structure to the queue. This method does not fail
            if sufficient jobs are already running, but simply
            submits the job. """
        self.__cleanup__()
        self.dc.mark_as_queued(a)
        if not os.path.isdir(self.tmp_folder):
            os.mkdir(self.tmp_folder)
        fname = '{0}/cand{1}.traj'.format(self.tmp_folder,
                                          a.info['confid'])
        write(fname, a)
        job_name = '{0}_{1}'.format(self.job_prefix, a.info['confid'])
        fd = open('tmp_job_file.job', 'w')
        fd.write(self.job_template_generator(job_name, fname))
        fd.close()
        os.system('{0} tmp_job_file.job'.format(self.qsub_command))

    def enough_jobs_running(self):
        """ Determines if sufficient jobs are running. """
        return self.number_of_jobs_running() >= self.n_simul

    def number_of_jobs_running(self):
        """ Determines how many jobs are running. The user
            should use this or the enough_jobs_running method
            to verify that a job needs to be started before
            calling the relax method."""
        self.__cleanup__()
        p = Popen(['`which {0}` -u `whoami`'.format(self.qstat_command)],
                  shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE,
                  close_fds=True, universal_newlines=True)
        fout = p.stdout
        lines = fout.readlines()
        n_running = 0
        for l in lines:
            if l.find(self.job_prefix) != -1:
                n_running += 1
        return n_running

    def __cleanup__(self):
        """ Tries to load in structures previously
            submitted to the queing system. """
        confs = self.dc.get_all_candidates_in_queue()
        for c in confs:
            fdone = '{0}/cand{1}_done.traj'.format(self.tmp_folder,
                                                   c)
            if os.path.isfile(fdone) and os.path.getsize(fdone) > 0:
                try:
                    a = []
                    niter = 0
                    while len(a) == 0 and niter < 5:
                        t = Trajectory(fdone, 'r')
                        a = [ats for ats in t]
                        if len(a) == 0:
                            time.sleep(1.)
                        niter += 1
                    if len(a) == 0:
                        txt = 'Could not read candidate ' + \
                            '{0} from the filesystem'.format(c)
                        raise IOError(txt)
                    a = a[-1]
                    a.info['confid'] = c
                    self.dc.add_relaxed_step(
                        a,
                        find_neighbors=self.find_neighbors,
                        perform_parametrization=self.perform_parametrization)
                except IOError as e:
                    print(e)