1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
import logging
import os
from enum import IntEnum
from typing import Optional
logger = logging.getLogger(__name__)
class JobState(IntEnum):
"""Defines a set of states that a job can be in"""
UNKNOWN = 0
"""The batch provider is unable to determinate a state for this job"""
PENDING = 1
""""This job is in the batch queue but has not started running"""
RUNNING = 2
"""This job is running in the batch system"""
CANCELLED = 3
"""This job has been cancelled"""
COMPLETED = 4
"""This job has completed"""
FAILED = 5
"""This job has failed"""
TIMEOUT = 6
"""This job has ended due to walltime expiry. This is different from
other error states, because in the pilot job model, a timeout is usually
expected and not a failure. Timeouts should not be reported with FAILED
state: if they are reported as FAILED, Parsl's error handling code
in `simple_error_handler` will eventually regard the batch system as
broken and shut down.
"""
HELD = 7
"""This job is held/suspended in the batch system"""
MISSING = 8
"""This job has reached a terminal state without the resources(managers/workers)
launched in the job connecting back to the Executor. This state is set by HTEX
when it is able to infer that the block failed to start workers for eg due to
bad worker environment or network connectivity issues.
"""
SCALED_IN = 9
"""This job has been deliberately scaled in. Scaling code should not be concerned
that the job never ran (for example for error handling purposes).
"""
def __str__(self) -> str:
return f"{self.__class__.__name__}.{self.name}"
TERMINAL_STATES = [JobState.CANCELLED, JobState.COMPLETED, JobState.FAILED,
JobState.TIMEOUT, JobState.MISSING, JobState.SCALED_IN]
class JobStatus:
"""Encapsulates a job state together with other details:
Args:
state: The machine-readable state of the job this status refers to
message: Optional human readable message
exit_code: Optional exit code
stdout_path: Optional path to a file containing the job's stdout
stderr_path: Optional path to a file containing the job's stderr
"""
SUMMARY_TRUNCATION_THRESHOLD = 2048
def __init__(self, state: JobState, message: Optional[str] = None, exit_code: Optional[int] = None,
stdout_path: Optional[str] = None, stderr_path: Optional[str] = None):
self.state = state
self.message = message
self.exit_code = exit_code
self.stdout_path = stdout_path
self.stderr_path = stderr_path
@property
def terminal(self) -> bool:
return self.state in TERMINAL_STATES
@property
def status_name(self) -> str:
return self.state.name
def __repr__(self) -> str:
if self.message is not None:
extra = f"state={self.state} message={self.message}"
else:
extra = f"state={self.state}"
return f"<{type(self).__module__}.{type(self).__qualname__} object at {hex(id(self))}, {extra}>"
def __str__(self) -> str:
if self.message is not None:
return f"{self.state} ({self.message})"
else:
return f"{self.state}"
@property
def stdout(self) -> Optional[str]:
return self._read_file(self.stdout_path)
@property
def stderr(self) -> Optional[str]:
return self._read_file(self.stderr_path)
def _read_file(self, path: Optional[str]) -> Optional[str]:
if path is None:
return None
try:
with open(path, 'r') as f:
return f.read()
except Exception:
logger.exception("Converting exception to None")
return None
@property
def stdout_summary(self) -> Optional[str]:
return self._read_summary(self.stdout_path)
@property
def stderr_summary(self) -> Optional[str]:
return self._read_summary(self.stderr_path)
def _read_summary(self, path: Optional[str]) -> Optional[str]:
if not path:
# can happen for synthetic job failures
return None
try:
with open(path, 'r') as f:
f.seek(0, os.SEEK_END)
size = f.tell()
f.seek(0, os.SEEK_SET)
if size > JobStatus.SUMMARY_TRUNCATION_THRESHOLD:
half_threshold = int(JobStatus.SUMMARY_TRUNCATION_THRESHOLD / 2)
head = f.read(half_threshold)
f.seek(size - half_threshold, os.SEEK_SET)
tail = f.read(half_threshold)
return head + '\n...\n' + tail
else:
f.seek(0, os.SEEK_SET)
return f.read()
except FileNotFoundError:
# When output is redirected to a file, but the process does not produce any output
# bytes, no file is actually created. This handles that case.
return None
|