1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
|
import logging
from typing import Callable, Iterable, Optional, Sequence, Union
import typeguard
from typing_extensions import Literal
from parsl.dataflow.dependency_resolvers import DependencyResolver
from parsl.dataflow.taskrecord import TaskRecord
from parsl.errors import ConfigurationError
from parsl.executors.base import ParslExecutor
from parsl.executors.threads import ThreadPoolExecutor
from parsl.monitoring import MonitoringHub
from parsl.usage_tracking.api import UsageInformation
from parsl.usage_tracking.levels import DISABLED as USAGE_TRACKING_DISABLED
from parsl.usage_tracking.levels import LEVEL_3 as USAGE_TRACKING_LEVEL_3
from parsl.utils import RepresentationMixin
logger = logging.getLogger(__name__)
class Config(RepresentationMixin, UsageInformation):
"""
Specification of Parsl configuration options.
Parameters
----------
executors : sequence of ParslExecutor, optional
List (or other iterable) of `ParslExecutor` instances to use for executing tasks.
Default is (:class:`~parsl.executors.threads.ThreadPoolExecutor()`,).
app_cache : bool, optional
Enable app caching. Default is True.
checkpoint_files : sequence of str, optional
List of paths to checkpoint files. See :func:`parsl.utils.get_all_checkpoints` and
:func:`parsl.utils.get_last_checkpoint` for helpers. Default is None.
checkpoint_mode : str, optional
Checkpoint mode to use, can be ``'dfk_exit'``, ``'task_exit'``, ``'periodic'`` or ``'manual'``.
If set to `None`, checkpointing will be disabled. Default is None.
checkpoint_period : str, optional
Time interval (in "HH:MM:SS") at which to checkpoint completed tasks. Only has an effect if
``checkpoint_mode='periodic'``.
dependency_resolver: plugin point for custom dependency resolvers. Default: only resolve Futures,
using the `SHALLOW_DEPENDENCY_RESOLVER`.
exit_mode: str, optional
When Parsl is used as a context manager (using ``with parsl.load`` syntax) then this parameter
controls what will happen to running tasks and exceptions at exit. The options are:
* ``cleanup``: cleanup the DFK on exit without waiting for any tasks
* ``skip``: skip all shutdown behaviour when exiting the context manager
* ``wait``: wait for all tasks to complete when exiting normally, but exit immediately when exiting due to an exception.
Default is ``cleanup``.
garbage_collect : bool. optional.
Delete task records from DFK when tasks have completed. Default: True
internal_tasks_max_threads : int, optional
Maximum number of threads to allocate for submit side internal tasks such as some data transfers
or @joinapps
Default is 10.
monitoring : MonitoringHub, optional
The config to use for database monitoring. Default is None which does not log to a database.
retries : int, optional
Set the number of retries (or available retry budget when using retry_handler) in case of failure. Default is 0.
retry_handler : function, optional
A user pluggable handler to decide if/how a task retry should happen.
If no handler is specified, then each task failure incurs a retry cost
of 1.
run_dir : str, optional
Path to run directory. Default is 'runinfo'.
std_autopath : function, optional
Sets the function used to generate stdout/stderr specifications when parsl.AUTO_LOGPATH is used. If no function
is specified, generates paths that look like: ``rundir/NNN/task_logs/X/task_{id}_{name}{label}.{out/err}``
strategy : str, optional
Strategy to use for scaling blocks according to workflow needs. Can be 'simple', 'htex_auto_scale', 'none'
or `None`.
If 'none' or `None`, dynamic scaling will be disabled. Default is 'simple'. The literal value `None` is
deprecated.
strategy_period : float or int, optional
How often the scaling strategy should be executed. Default is 5 seconds.
max_idletime : float, optional
The maximum idle time allowed for an executor before strategy could shut down unused blocks. Default is 120.0 seconds.
usage_tracking : int, optional
Set this field to 1, 2, or 3 to opt-in to Parsl's usage tracking system.
The value represents the level of usage tracking detail to be collected.
Setting this field to 0 will disable usage tracking. Default (this field is not set): usage tracking is not enabled.
Parsl only collects minimal, non personally-identifiable,
information used for reporting to our funding agencies.
project_name: str, optional
Option to deanonymize usage tracking data.
If set, this value will be used as the project name in the usage tracking data and placed on the leaderboard.
initialize_logging : bool, optional
Make DFK optionally not initialize any logging. Log messages
will still be passed into the python logging system under the
``parsl`` logger name, but the logging system will not by default
perform any further log system configuration. Most noticeably,
it will not create a parsl.log logfile. The use case for this
is when parsl is used as a library in a bigger system which
wants to configure logging in a way that makes sense for that
bigger system as a whole.
"""
@typeguard.typechecked
def __init__(self,
executors: Optional[Iterable[ParslExecutor]] = None,
app_cache: bool = True,
checkpoint_files: Optional[Sequence[str]] = None,
checkpoint_mode: Union[None,
Literal['task_exit'],
Literal['periodic'],
Literal['dfk_exit'],
Literal['manual']] = None,
checkpoint_period: Optional[str] = None,
dependency_resolver: Optional[DependencyResolver] = None,
exit_mode: Literal['cleanup', 'skip', 'wait'] = 'cleanup',
garbage_collect: bool = True,
internal_tasks_max_threads: int = 10,
retries: int = 0,
retry_handler: Optional[Callable[[Exception, TaskRecord], float]] = None,
run_dir: str = 'runinfo',
std_autopath: Optional[Callable] = None,
strategy: Optional[str] = 'simple',
strategy_period: Union[float, int] = 5,
max_idletime: float = 120.0,
monitoring: Optional[MonitoringHub] = None,
usage_tracking: int = 0,
project_name: Optional[str] = None,
initialize_logging: bool = True) -> None:
executors = tuple(executors or [])
if not executors:
executors = (ThreadPoolExecutor(),)
self._executors: Sequence[ParslExecutor] = executors
self._validate_executors()
self.app_cache = app_cache
self.checkpoint_files = checkpoint_files
self.checkpoint_mode = checkpoint_mode
if checkpoint_period is not None:
if checkpoint_mode is None:
logger.debug('The requested `checkpoint_period={}` will have no effect because `checkpoint_mode=None`'.format(
checkpoint_period)
)
elif checkpoint_mode != 'periodic':
logger.debug("Requested checkpoint period of {} only has an effect with checkpoint_mode='periodic'".format(
checkpoint_period)
)
if checkpoint_mode == 'periodic' and checkpoint_period is None:
checkpoint_period = "00:30:00"
self.checkpoint_period = checkpoint_period
self.dependency_resolver = dependency_resolver
self.exit_mode = exit_mode
self.garbage_collect = garbage_collect
self.internal_tasks_max_threads = internal_tasks_max_threads
self.retries = retries
self.retry_handler = retry_handler
self.run_dir = run_dir
self.strategy = strategy
self.strategy_period = strategy_period
self.max_idletime = max_idletime
self.validate_usage_tracking(usage_tracking)
self.usage_tracking = usage_tracking
self.project_name = project_name
self.initialize_logging = initialize_logging
self.monitoring = monitoring
self.std_autopath: Optional[Callable] = std_autopath
@property
def executors(self) -> Sequence[ParslExecutor]:
return self._executors
def _validate_executors(self) -> None:
if len(self.executors) == 0:
raise ConfigurationError('At least one executor must be specified')
labels = [e.label for e in self.executors]
duplicates = [e for n, e in enumerate(labels) if e in labels[:n]]
if len(duplicates) > 0:
raise ConfigurationError('Executors must have unique labels ({})'.format(
', '.join(['label={}'.format(repr(d)) for d in duplicates])))
def validate_usage_tracking(self, level: int) -> None:
if not USAGE_TRACKING_DISABLED <= level <= USAGE_TRACKING_LEVEL_3:
raise ConfigurationError(
f"Usage Tracking values must be 0, 1, 2, or 3 and not {level}"
)
def get_usage_information(self):
return {"executors_len": len(self.executors),
"dependency_resolver": self.dependency_resolver is not None}
|