1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
"""Interfaces modeled after Python's `concurrent library <https://docs.python.org/3/library/concurrent.html>`_"""
import time
from concurrent.futures import Executor
from contextlib import AbstractContextManager
from typing import Callable, Dict, Iterable, Iterator, Literal, Optional, TypeVar
from warnings import warn
from parsl import Config, DataFlowKernel, load
from parsl.app.python import PythonApp
T = TypeVar('T')
class ParslPoolExecutor(Executor, AbstractContextManager):
"""An executor that uses a pool of workers managed by Parsl
Works just like a :class:`~concurrent.futures.ProcessPoolExecutor` except that tasks
are distributed across workers that can be on different machines.
The futures returned by :meth:`submit` and :meth:`map` are Parsl futures and will work
with the same function chaining mechanisms as when using App-based Parsl.
Parsl does not support canceling tasks. The :meth:`map` method does not cancel work
when one member of the run fails or a timeout is reached
and :meth:`shutdown` does not cancel work on completion.
"""
def __init__(self, config: Config | None = None, dfk: DataFlowKernel | None = None, executors: Literal['all'] | list[str] = 'all'):
"""Create the executor
Args:
config: Configuration for the Parsl Data Flow Kernel (DFK)
dfk: DataFlowKernel of an already-started parsl
executors: List of executors to use for supplied functions
"""
if (config is not None) and (dfk is not None):
raise ValueError('Specify only one of config or dfk')
if (config is None) and (dfk is None):
raise ValueError('Must specify one of config or dfk')
self._config = config
self._app_cache: Dict[Callable, PythonApp] = {} # Cache specific to this instance: https://stackoverflow.com/questions/33672412
self._dfk = dfk
self.executors = executors
# Start workers immediately
if self._config is not None:
self._dfk = load(self._config)
def __exit__(self, exc_type, exc_val, exc_tb):
if self._dfk is None: # Nothing has been started, do nothing
return
elif self._config is not None: # The executors are being managed by this class, shut them down
self.shutdown(wait=True)
return
else: # The DFK is managed elsewhere, do nothing
return
@property
def app_count(self):
"""Number of functions currently registered with the executor"""
return len(self._app_cache)
def get_app(self, fn: Callable) -> PythonApp:
"""Create a PythonApp for a function
Args:
fn: Function to be turned into a Parsl app
Returns:
PythonApp version of that function
"""
if fn in self._app_cache:
return self._app_cache[fn]
app = PythonApp(fn, data_flow_kernel=self._dfk, executors=self.executors)
self._app_cache[fn] = app
return app
def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as ``fn(*args, **kwargs)`` and returns
a Future instance representing the execution of the callable.
Returns:
A Future representing the given call.
"""
if self._dfk is None:
raise RuntimeError('Executor has been shut down.')
app = self.get_app(fn)
return app(*args, **kwargs)
# TODO (wardlt): This override can go away when Parsl supports cancel
def map(self, fn: Callable[..., T], *iterables: Iterable, timeout: Optional[float] = None,
chunksize: int = 1, buffersize: Optional[int] = None) -> Iterator[T]:
"""Returns an iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: This parameter is ignored. Caution should be exercised
if expecting behaviour as documented in the base `concurrent.futures.Executor` class.
buffersize: This parameter is ignored. Caution should be exercised
if expecting behaviour as documented in the base `concurrent.futures.Executor` class.
Returns:
An iterator equivalent to: map(func, ``*iterables``) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If ``fn(*args)`` raises for any values.
"""
# This is a version of the CPython 3.9 `.map` implementation modified to not use `cancel`
if timeout is not None:
end_time = timeout + time.monotonic()
# Submit the applications
app = self.get_app(fn)
fs = [app(*args) for args in zip(*iterables)]
# Yield the futures as completed
def result_iterator():
# reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result()
else:
yield fs.pop().result(end_time - time.monotonic())
return result_iterator()
def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
if self._dfk is None:
return # Do nothing. Nothing is active
if cancel_futures:
warn(message="Canceling on-going tasks is not supported in Parsl")
if wait:
self._dfk.wait_for_current_tasks()
if self._config is not None: # The executors are being managed
self._dfk.cleanup() # Shutdown the DFK
self._dfk = None
|