File: multiprocessingrun.py

package info (click to toggle)
python-ase 3.26.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 15,484 kB
  • sloc: python: 148,112; xml: 2,728; makefile: 110; javascript: 47
file content (61 lines) | stat: -rw-r--r-- 1,900 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# fmt: off

""" Class for handling several simultaneous jobs.
The class has been tested on Niflheim-opteron4.
"""
import time
from multiprocessing import Pool

from ase.io import read, write


class MultiprocessingRun:
    """Class that allows for the simultaneous relaxation of
    several candidates on a cluster. Best used if each individual
    calculation is too small for using a queueing system.

    Parameters:

    data_connection: DataConnection object.

    tmp_folder: Folder for temporary files.

    n_simul: The number of simultaneous relaxations.

    relax_function: The relaxation function. This needs to return
    the filename of the relaxed structure.
    """

    def __init__(self, data_connection, relax_function,
                 tmp_folder, n_simul=None):
        self.dc = data_connection
        self.pool = Pool(n_simul)
        self.relax_function = relax_function
        self.tmp_folder = tmp_folder
        self.results = []

    def relax(self, a):
        """Relax the atoms object a by submitting the relaxation
        to the pool of cpus."""
        self.dc.mark_as_queued(a)
        fname = '{}/cand{}.traj'.format(self.tmp_folder,
                                        a.info['confid'])
        write(fname, a)
        self.results.append(self.pool.apply_async(self.relax_function,
                                                  [fname]))
        self._cleanup()

    def _cleanup(self):
        for r in self.results:
            if r.ready() and r.successful():
                fname = r.get()
                a = read(fname)
                self.dc.add_relaxed_step(a)
                self.results.remove(r)

    def finish_all(self):
        """Checks that all calculations are finished, if not
        wait and check again. Return when all are finished."""
        while len(self.results) > 0:
            self._cleanup()
            time.sleep(2.)