File: __init__.py

package info (click to toggle)
python-parsl 2025.12.01%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 12,112 kB
  • sloc: python: 24,369; makefile: 352; sh: 252; ansic: 45
file content (166 lines) | stat: -rw-r--r-- 6,773 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""Interfaces modeled after Python's `concurrent library <https://docs.python.org/3/library/concurrent.html>`_"""
import time
from concurrent.futures import Executor
from contextlib import AbstractContextManager
from typing import Callable, Dict, Iterable, Iterator, Literal, Optional
from warnings import warn

from parsl import Config, DataFlowKernel, load
from parsl.app.python import PythonApp


class ParslPoolExecutor(Executor, AbstractContextManager):
    """An executor that uses a pool of workers managed by Parsl

    Works just like a :class:`~concurrent.futures.ProcessPoolExecutor` except that tasks
    are distributed across workers that can be on different machines.

    Create a new executor using one of two methods:

    1. Supplying a Parsl :class:`~parsl.Config` that defines how to create new workers.
       The executor will start a new Parsl Data Flow Kernel (DFK) when it is entered as a context manager.

    2. Supplying an already-started Parsl :class:`~parsl.DataFlowKernel` (DFK).
       The executor assumes you will start and stop the Parsl DFK outside the Executor.

    The futures returned by :meth:`submit` and :meth:`map` are Parsl futures and will work
    with the same function chaining mechanisms as when using Parsl with decorators.

    .. code-block:: python

        def f(x):
            return x + 1

        @python_app
        def parity(x):
            return 'odd' if x % 2 == 1 else 'even'

        with ParslPoolExecutor(config=my_parsl_config) as executor:
            future_1 = executor.submit(f, 1)
            assert parity(future_1) == 'even'  # Function chaining, as expected

            future_2 = executor.submit(f, future_1)
            assert future_2.result() == 3  # Chaining works with `submit` too

    Parsl does not support canceling tasks. The :meth:`map` method does not cancel work
    when one member of the run fails or a timeout is reached
    and :meth:`shutdown` does not cancel work on completion.
    """

    def __init__(self, config: Config | None = None, dfk: DataFlowKernel | None = None, executors: Literal['all'] | list[str] = 'all'):
        """Create the executor

        Args:
            config: Configuration for the Parsl Data Flow Kernel (DFK)
            dfk: DataFlowKernel of an already-started parsl
            executors: List of executors to use for supplied functions
        """
        if (config is not None) and (dfk is not None):
            raise ValueError('Specify only one of config or dfk')
        if (config is None) and (dfk is None):
            raise ValueError('Must specify one of config or dfk')
        self._config = config
        self._app_cache: Dict[Callable, PythonApp] = {}  # Cache specific to this instance: https://stackoverflow.com/questions/33672412
        self._dfk = dfk
        self.executors = executors

        # Start workers immediately
        if self._config is not None:
            self._dfk = load(self._config)

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._dfk is None:  # Nothing has been started, do nothing
            return
        elif self._config is not None:  # The executors are being managed by this class, shut them down
            self.shutdown(wait=True)
            return
        else:  # The DFK is managed elsewhere, do nothing
            return

    @property
    def app_count(self):
        """Number of functions currently registered with the executor"""
        return len(self._app_cache)

    def get_app(self, fn: Callable) -> PythonApp:
        """Create a PythonApp for a function

        Args:
            fn: Function to be turned into a Parsl app
        Returns:
            PythonApp version of that function
        """
        if fn in self._app_cache:
            return self._app_cache[fn]
        app = PythonApp(fn, data_flow_kernel=self._dfk, executors=self.executors)
        self._app_cache[fn] = app
        return app

    def submit(self, fn, *args, **kwargs):
        """Submits a callable to be executed with the given arguments.

        Schedules the callable to be executed as ``fn(*args, **kwargs)`` and returns
        a Future instance representing the execution of the callable.

        Returns:
            A Future representing the given call.
        """

        if self._dfk is None:
            raise RuntimeError('Executor has been shut down.')
        app = self.get_app(fn)
        return app(*args, **kwargs)

    # TODO (wardlt): This override can go away when Parsl supports cancel
    def map(self, fn: Callable, *iterables: Iterable, timeout: Optional[float] = None, chunksize: int = 1) -> Iterator:
        """Returns an iterator equivalent to map(fn, iter).

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.
            chunksize: If greater than one, the iterables will be chopped into
                chunks of size chunksize and submitted to the process pool.
                If set to one, the items in the list will be sent one at a time.

        Returns:
            An iterator equivalent to: map(func, ``*iterables``) but the calls may
            be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If ``fn(*args)`` raises for any values.
        """
        # This is a version of the CPython 3.9 `.map` implementation modified to not use `cancel`
        if timeout is not None:
            end_time = timeout + time.monotonic()

        # Submit the applications
        app = self.get_app(fn)
        fs = [app(*args) for args in zip(*iterables)]

        # Yield the futures as completed
        def result_iterator():
            # reverse to keep finishing order
            fs.reverse()
            while fs:
                # Careful not to keep a reference to the popped future
                if timeout is None:
                    yield fs.pop().result()
                else:
                    yield fs.pop().result(end_time - time.monotonic())

        return result_iterator()

    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
        if self._dfk is None:
            return  # Do nothing. Nothing is active
        if cancel_futures:
            warn(message="Canceling on-going tasks is not supported in Parsl")
        if wait:
            self._dfk.wait_for_current_tasks()
        if self._config is not None:  # The executors are being managed
            self._dfk.cleanup()  # Shutdown the DFK
        self._dfk = None