File: cluster_provider.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (138 lines) | stat: -rw-r--r-- 4,370 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import logging
from abc import abstractmethod
from string import Template

from parsl.launchers.base import Launcher
from parsl.launchers.errors import BadLauncher
from parsl.providers.base import ExecutionProvider
from parsl.providers.errors import SchedulerMissingArgs, ScriptPathError
from parsl.utils import execute_wait

logger = logging.getLogger(__name__)


class ClusterProvider(ExecutionProvider):
    """ This class defines behavior common to all cluster/supercompute-style scheduler systems.

    Parameters
    ----------
      label : str
        Label for this provider.
      walltime : str
        Walltime requested per block in HH:MM:SS.
      launcher : Launcher
        Launcher for this provider.
      cmd_timeout : int
        Timeout for commands made to the scheduler in seconds

    .. code:: python

                                +------------------
                                |
          script_string ------->|  submit
               id      <--------|---+
                                |
          [ ids ]       ------->|  status
          [statuses]   <--------|----+
                                |
          [ ids ]       ------->|  cancel
          [cancel]     <--------|----+
                                |
                                +-------------------
    """

    def __init__(self,
                 label,
                 nodes_per_block,
                 init_blocks,
                 min_blocks,
                 max_blocks,
                 parallelism,
                 walltime,
                 launcher,
                 cmd_timeout=10):

        self._label = label
        self.nodes_per_block = nodes_per_block
        self.init_blocks = init_blocks
        self.min_blocks = min_blocks
        self.max_blocks = max_blocks
        self.parallelism = parallelism
        self.launcher = launcher
        self.walltime = walltime
        self.cmd_timeout = cmd_timeout
        if not isinstance(self.launcher, Launcher):
            raise BadLauncher(self.launcher)

        self.script_dir = None

        # Dictionary that keeps track of jobs, keyed on job_id
        self.resources = {}

    def execute_wait(self, cmd, timeout=None):
        t = self.cmd_timeout
        if timeout is not None:
            t = timeout
        return execute_wait(cmd, t)

    def _write_submit_script(self, template, script_filename, job_name, configs):
        """Generate submit script and write it to a file.

        Args:
              - template (string) : The template string to be used for the writing submit script
              - script_filename (string) : Name of the submit script
              - job_name (string) : job name
              - configs (dict) : configs that get pushed into the template

        Returns:
              - None

        Raises:
              SchedulerMissingArgs : If template is missing args
              ScriptPathError : Unable to write submit script out
        """

        try:
            submit_script = Template(template).substitute(jobname=job_name, **configs)
            with open(script_filename, 'w') as f:
                f.write(submit_script)

        except KeyError as e:
            logger.error("Missing keys for submit script : %s", e)
            raise SchedulerMissingArgs(e.args, self.label)

        except IOError as e:
            logger.error("Failed writing to submit script: %s", script_filename)
            raise ScriptPathError(script_filename, e)
        except Exception as e:
            print("Template : ", template)
            print("Args : ", job_name)
            print("Kwargs : ", configs)
            logger.error("Uncategorized error: %s", e)
            raise e

    @abstractmethod
    def _status(self):
        pass

    def status(self, job_ids):
        """ Get the status of a list of jobs identified by the job identifiers
        returned from the submit request.

        Args:
             - job_ids (list) : A list of job identifiers

        Returns:
             - A list of JobStatus objects corresponding to each job_id in the job_ids list.

        Raises:
             - ExecutionProviderException or its subclasses

        """
        if job_ids:
            self._status()
        return [self.resources[jid]['status'] for jid in job_ids]

    @property
    def label(self):
        return self._label