File: local.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (272 lines) | stat: -rw-r--r-- 10,472 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
import logging
import os
import time

from parsl.jobs.states import JobState, JobStatus
from parsl.launchers import SingleNodeLauncher
from parsl.providers.base import ExecutionProvider
from parsl.providers.errors import (
    SchedulerMissingArgs,
    ScriptPathError,
    SubmitException,
)
from parsl.utils import RepresentationMixin, execute_wait

logger = logging.getLogger(__name__)


class LocalProvider(ExecutionProvider, RepresentationMixin):
    """ Local Execution Provider

    This provider is used to provide execution resources from the localhost.

    Parameters
    ----------

    min_blocks : int
        Minimum number of blocks to maintain.
    max_blocks : int
        Maximum number of blocks to maintain.
    parallelism : float
        Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive
        scaling where as many resources as possible are used; parallelism close to 0 represents
        the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
    worker_init : str
        Command to be run before starting a worker, such as 'module load Anaconda; source activate env'.
    """

    def __init__(self,
                 nodes_per_block=1,
                 launcher=SingleNodeLauncher(),
                 init_blocks=1,
                 min_blocks=0,
                 max_blocks=1,
                 worker_init='',
                 cmd_timeout=30,
                 parallelism=1):
        self._label = 'local'
        self.nodes_per_block = nodes_per_block
        self.launcher = launcher
        self.worker_init = worker_init
        self.init_blocks = init_blocks
        self.min_blocks = min_blocks
        self.max_blocks = max_blocks
        self.parallelism = parallelism
        self.script_dir = None
        self.cmd_timeout = cmd_timeout

        # Dictionary that keeps track of jobs, keyed on job_id
        self.resources = {}

    def status(self, job_ids):
        '''  Get the status of a list of jobs identified by their ids.

        Args:
            - job_ids (List of ids) : List of identifiers for the jobs

        Returns:
            - List of status codes.

        '''

        for job_id in self.resources:
            # This job dict should really be a class on its own
            job_dict = self.resources[job_id]
            if job_dict['status'] and job_dict['status'].terminal:
                # We already checked this and it can't change after that
                continue
            script_path = job_dict['script_path']

            alive = self._is_alive(job_dict)
            str_ec = self._read_job_file(script_path, '.ec').strip()

            status = None
            if str_ec == '-':
                if alive:
                    status = JobStatus(JobState.RUNNING)
                else:
                    # not alive but didn't get to write an exit code
                    if 'cancelled' in job_dict:
                        # because we cancelled it
                        status = JobStatus(JobState.CANCELLED)
                    else:
                        # we didn't cancel it, so it must have been killed by something outside
                        # parsl; we don't have a state for this, but we'll use CANCELLED with
                        # a specific message
                        status = JobStatus(JobState.CANCELLED, message='Killed')
            else:
                try:
                    # TODO: ensure that these files are only read once and clean them
                    ec = int(str_ec)
                    stdout_path = self._job_file_path(script_path, '.out')
                    stderr_path = self._job_file_path(script_path, '.err')
                    if ec == 0:
                        state = JobState.COMPLETED
                    else:
                        state = JobState.FAILED
                    status = JobStatus(state, exit_code=ec,
                                       stdout_path=stdout_path, stderr_path=stderr_path)
                except Exception:
                    status = JobStatus(JobState.FAILED,
                                       'Cannot parse exit code: {}'.format(str_ec))

            job_dict['status'] = status

        return [self.resources[jid]['status'] for jid in job_ids]

    def _is_alive(self, job_dict):
        retcode, stdout, stderr = execute_wait(
            'ps -p {} > /dev/null 2> /dev/null; echo "STATUS:$?" '.format(
                job_dict['remote_pid']), self.cmd_timeout)
        for line in stdout.split('\n'):
            if line.startswith("STATUS:"):
                status = line.split("STATUS:")[1].strip()
                if status == "0":
                    return True
                else:
                    return False

    def _job_file_path(self, script_path: str, suffix: str) -> str:
        path = '{0}{1}'.format(script_path, suffix)
        return path

    def _read_job_file(self, script_path: str, suffix: str) -> str:
        path = self._job_file_path(script_path, suffix)

        with open(path, 'r') as f:
            return f.read()

    def _write_submit_script(self, script_string, script_filename):
        '''
        Load the template string with config values and write the generated submit script to
        a submit script file.

        Args:
              - template_string (string) : The template string to be used for the writing submit script
              - script_filename (string) : Name of the submit script

        Returns:
              - True: on success

        Raises:
              SchedulerMissingArgs : If template is missing args
              ScriptPathError : Unable to write submit script out
        '''

        try:
            with open(script_filename, 'w') as f:
                f.write(script_string)

        except KeyError as e:
            logger.error("Missing keys for submit script: %s", e)
            raise SchedulerMissingArgs(e.args, self.label)

        except IOError as e:
            logger.error("Failed writing to submit script: %s", script_filename)
            raise ScriptPathError(script_filename, e)

        return True

    def submit(self, command, tasks_per_node, job_name="parsl.localprovider"):
        ''' Submits the command onto an Local Resource Manager job.
        Submit returns an ID that corresponds to the task that was just submitted.

        If tasks_per_node <  1:
             1/tasks_per_node is provisioned

        If tasks_per_node == 1:
             A single node is provisioned

        If tasks_per_node >  1 :
             tasks_per_node nodes are provisioned.

        Args:
             - command  :(String) Commandline invocation to be made on the remote side.
             - tasks_per_node (int) : command invocations to be launched per node

        Kwargs:
             - job_name (String): Name for job, must be unique

        Returns:
             - None: At capacity, cannot provision more
             - job_id: (string) Identifier for the job

        '''

        job_name = "{0}.{1}".format(job_name, time.time())

        # Set script path
        script_path = "{0}/{1}.sh".format(self.script_dir, job_name)
        script_path = os.path.abspath(script_path)

        wrap_command = self.worker_init + f'\nexport JOBNAME={job_name}\n' + self.launcher(command, tasks_per_node, self.nodes_per_block)

        self._write_submit_script(wrap_command, script_path)

        job_id = None
        remote_pid = None

        logger.debug("Launching")
        # We need to capture the exit code and the streams, so we put them in files. We also write
        # '-' to the exit code file to isolate potential problems with writing to files in the
        # script directory
        #
        # The basic flow is:
        #   1. write "-" to the exit code file. If this fails, exit
        #   2. Launch the following sequence in the background:
        #      a. the command to run
        #      b. write the exit code of the command from (a) to the exit code file
        #   3. Write the PID of the background sequence on stdout. The PID is needed if we want to
        #      cancel the task later.
        #
        # We need to do the >/dev/null 2>&1 so that bash closes stdout, otherwise
        # execute_wait hangs reading the process stdout until all the
        # background commands complete.
        cmd = '/bin/bash -c \'echo - >{0}.ec && {{ {{ bash {0} 1>{0}.out 2>{0}.err ; ' \
              'echo $? > {0}.ec ; }} >/dev/null 2>&1 & echo "PID:$!" ; }}\''.format(script_path)
        retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout)
        if retcode != 0:
            raise SubmitException(job_name, "Launch command exited with code {0}".format(retcode),
                                  stdout, stderr)
        for line in stdout.split('\n'):
            if line.startswith("PID:"):
                remote_pid = line.split("PID:")[1].strip()
                job_id = remote_pid
        if job_id is None:
            raise SubmitException(job_name, "Channel failed to start remote command/retrieve PID")

        self.resources[job_id] = {'job_id': job_id, 'status': JobStatus(JobState.RUNNING),
                                  'remote_pid': remote_pid, 'script_path': script_path}

        return job_id

    def cancel(self, job_ids):
        ''' Cancels the jobs specified by a list of job ids

        Args:
        job_ids : [<job_id> ...]

        Returns: [True]  Always returns true for every job_id, regardless of
                         whether an individual cancel failed (unless an
                         exception is raised)
        '''
        for job in job_ids:
            job_dict = self.resources[job]
            job_dict['cancelled'] = True
            logger.debug("Terminating job/process ID: {0}".format(job))
            cmd = "kill -- -$(ps -o pgid= {} | grep -o '[0-9]*')".format(job_dict['remote_pid'])
            retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout)
            if retcode != 0:
                logger.warning("Failed to kill PID: {} and child processes on {}".format(job_dict['remote_pid'],
                                                                                         self.label))

        rets = [True for i in job_ids]
        return rets

    @property
    def label(self):
        return self._label

    @property
    def status_polling_interval(self):
        return 5