File: grid_engine.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (222 lines) | stat: -rw-r--r-- 8,072 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import logging
import os
import time

from parsl.jobs.states import JobState, JobStatus
from parsl.launchers import SingleNodeLauncher
from parsl.providers.cluster_provider import ClusterProvider
from parsl.providers.grid_engine.template import template_string
from parsl.utils import RepresentationMixin

logger = logging.getLogger(__name__)

translate_table = {
    'qw': JobState.PENDING,
    'hqw': JobState.PENDING,
    'hrwq': JobState.PENDING,
    'r': JobState.RUNNING,
    's': JobState.FAILED,  # obsuspended
    'ts': JobState.FAILED,
    't': JobState.FAILED,  # Suspended by alarm
    'eqw': JobState.FAILED,  # Error states
    'ehqw': JobState.FAILED,  # ..
    'ehrqw': JobState.FAILED,  # ..
    'd': JobState.COMPLETED,
    'dr': JobState.COMPLETED,
    'dt': JobState.COMPLETED,
    'drt': JobState.COMPLETED,
    'ds': JobState.COMPLETED,
    'drs': JobState.COMPLETED,
}


class GridEngineProvider(ClusterProvider, RepresentationMixin):
    """A provider for the Grid Engine scheduler.

    Parameters
    ----------
    nodes_per_block : int
        Nodes to provision per block.
    min_blocks : int
        Minimum number of blocks to maintain.
    max_blocks : int
        Maximum number of blocks to maintain.
    parallelism : float
        Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive
        scaling where as many resources as possible are used; parallelism close to 0 represents
        the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
    walltime : str
        Walltime requested per block in HH:MM:SS.
    scheduler_options : str
        String to prepend to the #$$ blocks in the submit script to the scheduler.
    worker_init : str
        Command to be run before starting a worker, such as 'module load Anaconda; source activate env'.
    launcher : Launcher
        Launcher for this provider. Possible launchers include
        :class:`~parsl.launchers.SingleNodeLauncher` (the default),
    cmd_timeout : int
        Timeout for commands made to the scheduler in seconds
    """

    def __init__(self,
                 nodes_per_block=1,
                 init_blocks=1,
                 min_blocks=0,
                 max_blocks=1,
                 parallelism=1,
                 walltime="00:10:00",
                 scheduler_options='',
                 worker_init='',
                 launcher=SingleNodeLauncher(),
                 cmd_timeout: int = 60,
                 queue=None):
        label = 'grid_engine'
        super().__init__(label,
                         nodes_per_block,
                         init_blocks,
                         min_blocks,
                         max_blocks,
                         parallelism,
                         walltime,
                         launcher,
                         cmd_timeout=cmd_timeout)
        self.scheduler_options = scheduler_options
        self.worker_init = worker_init
        self.queue = queue

        if launcher in ['srun', 'srun_mpi']:
            logger.warning("Use of {} launcher is usually appropriate for Slurm providers. "
                           "Recommended options include 'single_node' or 'aprun'.".format(launcher))

    def get_configs(self, command, tasks_per_node):
        """Compose a dictionary with information for writing the submit script."""

        logger.debug("Requesting one block with {} nodes per block and {} tasks per node".format(
            self.nodes_per_block, tasks_per_node))

        job_config = {}
        job_config["submit_script_dir"] = self.script_dir
        job_config["nodes"] = self.nodes_per_block
        job_config["walltime"] = self.walltime
        job_config["scheduler_options"] = self.scheduler_options
        job_config["worker_init"] = self.worker_init
        job_config["user_script"] = command

        job_config["user_script"] = self.launcher(command,
                                                  tasks_per_node,
                                                  self.nodes_per_block)
        return job_config

    def submit(self, command, tasks_per_node, job_name="parsl.sge"):
        ''' The submit method takes the command string to be executed upon
        instantiation of a resource most often to start a pilot.

        Args :
             - command (str) : The bash command string to be executed.
             - tasks_per_node (int) : command invocations to be launched per node

        KWargs:
             - job_name (str) : Human friendly name to be assigned to the job request

        Returns:
             - A job identifier, this could be an integer, string etc

        Raises:
             - ExecutionProviderException or its subclasses
        '''

        # Set job name
        job_name = "{0}.{1}".format(job_name, time.time())

        # Set script path
        script_path = "{0}/{1}.submit".format(self.script_dir, job_name)
        script_path = os.path.abspath(script_path)

        job_config = self.get_configs(command, tasks_per_node)

        logger.debug("Writing submit script")
        self._write_submit_script(template_string, script_path, job_name, job_config)

        if self.queue is not None:
            cmd = "qsub -q {0} -terse {1}".format(self.queue, script_path)
        else:
            cmd = "qsub -terse {0}".format(script_path)
        retcode, stdout, stderr = self.execute_wait(cmd)

        if retcode == 0:
            for line in stdout.split('\n'):
                job_id = line.strip()
                if not job_id:
                    continue
                self.resources[job_id] = {'job_id': job_id, 'status': JobStatus(JobState.PENDING)}
                return job_id
        else:
            logger.error("Submit command failed")
            logger.error("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip())

    def _status(self):
        ''' Get the status of a list of jobs identified by the job identifiers
        returned from the submit request.

        Returns:
             - A list of JobStatus objects corresponding to each job_id in the job_ids list.

        Raises:
             - ExecutionProviderException or its subclasses

        '''

        cmd = "qstat"

        retcode, stdout, stderr = self.execute_wait(cmd)

        # Execute_wait failed. Do no update
        if retcode != 0:
            return

        jobs_missing = list(self.resources.keys())
        for line in stdout.split('\n'):
            parts = line.split()
            if parts and parts[0].lower().lower() != 'job-id' \
                    and not parts[0].startswith('----'):
                job_id = parts[0]
                state = translate_table.get(parts[4].lower(), JobState.UNKNOWN)
                if job_id in self.resources:
                    self.resources[job_id]['status'] = JobStatus(state)
                    jobs_missing.remove(job_id)

        # Filling in missing blanks for jobs that might have gone missing
        # we might lose some information about why the jobs failed.
        for missing_job in jobs_missing:
            self.resources[missing_job]['status'] = JobStatus(JobState.COMPLETED)

    def cancel(self, job_ids):
        ''' Cancels the resources identified by the job_ids provided by the user.

        Args:
             - job_ids (list): A list of job identifiers

        Returns:
             - A list of status from cancelling the job which can be True, False

        Raises:
             - ExecutionProviderException or its subclasses
        '''

        job_id_list = ' '.join(job_ids)
        cmd = "qdel {}".format(job_id_list)
        retcode, stdout, stderr = self.execute_wait(cmd)

        rets = None
        if retcode == 0:
            for jid in job_ids:
                self.resources[jid]['status'] = JobStatus(JobState.COMPLETED)
            rets = [True for i in job_ids]
        else:
            rets = [False for i in job_ids]

        return rets

    @property
    def status_polling_interval(self):
        return 60