File: __init__.py

package info (click to toggle)
python-parsl 2026.02.09%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 12,144 kB
  • sloc: python: 24,446; makefile: 352; sh: 252; ansic: 45
file content (146 lines) | stat: -rw-r--r-- 6,066 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""Interfaces modeled after Python's `concurrent library <https://docs.python.org/3/library/concurrent.html>`_"""
import time
from concurrent.futures import Executor
from contextlib import AbstractContextManager
from typing import Callable, Dict, Iterable, Iterator, Literal, Optional, TypeVar
from warnings import warn

from parsl import Config, DataFlowKernel, load
from parsl.app.python import PythonApp

T = TypeVar('T')


class ParslPoolExecutor(Executor, AbstractContextManager):
    """An executor that uses a pool of workers managed by Parsl

    Works just like a :class:`~concurrent.futures.ProcessPoolExecutor` except that tasks
    are distributed across workers that can be on different machines.

    The futures returned by :meth:`submit` and :meth:`map` are Parsl futures and will work
    with the same function chaining mechanisms as when using App-based Parsl.

    Parsl does not support canceling tasks. The :meth:`map` method does not cancel work
    when one member of the run fails or a timeout is reached
    and :meth:`shutdown` does not cancel work on completion.
    """

    def __init__(self, config: Config | None = None, dfk: DataFlowKernel | None = None, executors: Literal['all'] | list[str] = 'all'):
        """Create the executor

        Args:
            config: Configuration for the Parsl Data Flow Kernel (DFK)
            dfk: DataFlowKernel of an already-started parsl
            executors: List of executors to use for supplied functions
        """
        if (config is not None) and (dfk is not None):
            raise ValueError('Specify only one of config or dfk')
        if (config is None) and (dfk is None):
            raise ValueError('Must specify one of config or dfk')
        self._config = config
        self._app_cache: Dict[Callable, PythonApp] = {}  # Cache specific to this instance: https://stackoverflow.com/questions/33672412
        self._dfk = dfk
        self.executors = executors

        # Start workers immediately
        if self._config is not None:
            self._dfk = load(self._config)

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._dfk is None:  # Nothing has been started, do nothing
            return
        elif self._config is not None:  # The executors are being managed by this class, shut them down
            self.shutdown(wait=True)
            return
        else:  # The DFK is managed elsewhere, do nothing
            return

    @property
    def app_count(self):
        """Number of functions currently registered with the executor"""
        return len(self._app_cache)

    def get_app(self, fn: Callable) -> PythonApp:
        """Create a PythonApp for a function

        Args:
            fn: Function to be turned into a Parsl app
        Returns:
            PythonApp version of that function
        """
        if fn in self._app_cache:
            return self._app_cache[fn]
        app = PythonApp(fn, data_flow_kernel=self._dfk, executors=self.executors)
        self._app_cache[fn] = app
        return app

    def submit(self, fn, *args, **kwargs):
        """Submits a callable to be executed with the given arguments.

        Schedules the callable to be executed as ``fn(*args, **kwargs)`` and returns
        a Future instance representing the execution of the callable.

        Returns:
            A Future representing the given call.
        """

        if self._dfk is None:
            raise RuntimeError('Executor has been shut down.')
        app = self.get_app(fn)
        return app(*args, **kwargs)

    # TODO (wardlt): This override can go away when Parsl supports cancel
    def map(self, fn: Callable[..., T], *iterables: Iterable, timeout: Optional[float] = None,
            chunksize: int = 1, buffersize: Optional[int] = None) -> Iterator[T]:
        """Returns an iterator equivalent to map(fn, iter).

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.
            chunksize: This parameter is ignored. Caution should be exercised
                if expecting behaviour as documented in the base `concurrent.futures.Executor` class.
            buffersize: This parameter is ignored. Caution should be exercised
                if expecting behaviour as documented in the base `concurrent.futures.Executor` class.

        Returns:
            An iterator equivalent to: map(func, ``*iterables``) but the calls may
            be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If ``fn(*args)`` raises for any values.
        """
        # This is a version of the CPython 3.9 `.map` implementation modified to not use `cancel`
        if timeout is not None:
            end_time = timeout + time.monotonic()

        # Submit the applications
        app = self.get_app(fn)
        fs = [app(*args) for args in zip(*iterables)]

        # Yield the futures as completed
        def result_iterator():
            # reverse to keep finishing order
            fs.reverse()
            while fs:
                # Careful not to keep a reference to the popped future
                if timeout is None:
                    yield fs.pop().result()
                else:
                    yield fs.pop().result(end_time - time.monotonic())

        return result_iterator()

    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
        if self._dfk is None:
            return  # Do nothing. Nothing is active
        if cancel_futures:
            warn(message="Canceling on-going tasks is not supported in Parsl")
        if wait:
            self._dfk.wait_for_current_tasks()
        if self._config is not None:  # The executors are being managed
            self._dfk.cleanup()  # Shutdown the DFK
        self._dfk = None