File: app.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (197 lines) | stat: -rw-r--r-- 8,532 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""Definitions for the @App decorator and the App classes.

The App class encapsulates a generic leaf task that can be executed asynchronously.
"""
import logging
from abc import ABCMeta, abstractmethod
from inspect import signature
from typing import Any, Callable, Dict, List, Optional, Sequence, Union

import typeguard
from typing_extensions import Literal

from parsl.dataflow.dflow import DataFlowKernel
from parsl.dataflow.futures import AppFuture

logger = logging.getLogger(__name__)


class AppBase(metaclass=ABCMeta):
    """This is the base class that defines the two external facing functions that an App must define.

    The  __init__ () which is called when the interpreter sees the definition of the decorated
    function, and the __call__ () which is invoked when a decorated function is called by the user.

    """

    @typeguard.typechecked
    def __init__(self, func: Callable,
                 data_flow_kernel: Optional[DataFlowKernel] = None,
                 executors: Union[List[str], Literal['all']] = 'all',
                 cache: bool = False,
                 ignore_for_cache: Optional[Sequence[str]] = None) -> None:
        """Construct the App object.

        Args:
             - func (function): Takes the function to be made into an App

        Kwargs:
             - data_flow_kernel (DataFlowKernel): The :class:`~parsl.dataflow.dflow.DataFlowKernel` responsible for
               managing this app. This can be omitted only
               after calling :meth:`parsl.dataflow.dflow.DataFlowKernelLoader.load`.
             - executors (str|list) : Labels of the executors that this app can execute over. Default is 'all'.
             - cache (Bool) : Enable caching of this app ?
             - ignore_for_cache (sequence|None): Names of arguments which will be ignored by the caching mechanism.

        Returns:
             - App object.

        """
        self.__name__ = func.__name__
        self.func = func
        self.data_flow_kernel = data_flow_kernel
        self.executors = executors
        self.cache = cache
        self.ignore_for_cache = ignore_for_cache

        params = signature(func).parameters

        self.kwargs: Dict[str, Any]
        self.kwargs = {}
        if 'stdout' in params:
            self.kwargs['stdout'] = params['stdout'].default
        if 'stderr' in params:
            self.kwargs['stderr'] = params['stderr'].default
        if 'walltime' in params:
            self.kwargs['walltime'] = params['walltime'].default
        if 'parsl_resource_specification' in params:
            self.kwargs['parsl_resource_specification'] = params['parsl_resource_specification'].default
        if 'outputs' in params:
            self.kwargs['outputs'] = params['outputs'].default
        if 'inputs' in params:
            self.kwargs['inputs'] = params['inputs'].default

    @abstractmethod
    def __call__(self, *args: Any, **kwargs: Any) -> AppFuture:
        pass


@typeguard.typechecked
def python_app(function: Optional[Callable] = None,
               data_flow_kernel: Optional[DataFlowKernel] = None,
               cache: bool = False,
               executors: Union[List[str], Literal['all']] = 'all',
               ignore_for_cache: Optional[Sequence[str]] = None) -> Callable:
    """Decorator function for making python apps.

    Parameters
    ----------
    function : function
        Do not pass this keyword argument directly. This is needed in order to allow for omitted parenthesis,
        for example, ``@python_app`` if using all defaults or ``@python_app(walltime=120)``. If the
        decorator is used alone, function will be the actual function being decorated, whereas if it
        is called with arguments, function will be None. Default is None.
    data_flow_kernel : DataFlowKernel
        The :class:`~parsl.dataflow.dflow.DataFlowKernel` responsible for managing this app. This can
        be omitted only after calling :meth:`parsl.dataflow.dflow.DataFlowKernelLoader.load`. Default is None.
    executors : string or list
        Labels of the executors that this app can execute over. Default is 'all'.
    cache : bool
        Enable caching of the app call. Default is False.
    ignore_for_cache : (sequence|None)
        Names of arguments which will be ignored by the caching mechanism.
    """
    from parsl.app.python import PythonApp

    def decorator(func: Callable) -> Callable:
        def wrapper(f: Callable) -> PythonApp:
            return PythonApp(f,
                             data_flow_kernel=data_flow_kernel,
                             cache=cache,
                             executors=executors,
                             ignore_for_cache=ignore_for_cache,
                             join=False)
        return wrapper(func)
    if function is not None:
        return decorator(function)
    return decorator


@typeguard.typechecked
def join_app(function: Optional[Callable] = None,
             data_flow_kernel: Optional[DataFlowKernel] = None,
             cache: bool = False,
             ignore_for_cache: Optional[Sequence[str]] = None) -> Callable:
    """Decorator function for making join apps

    Parameters
    ----------
    function : function
        Do not pass this keyword argument directly. This is needed in order to allow for omitted parenthesis,
        for example, ``@python_app`` if using all defaults or ``@python_app(walltime=120)``. If the
        decorator is used alone, function will be the actual function being decorated, whereas if it
        is called with arguments, function will be None. Default is None.
    data_flow_kernel : DataFlowKernel
        The :class:`~parsl.dataflow.dflow.DataFlowKernel` responsible for managing this app. This can
        be omitted only after calling :meth:`parsl.dataflow.dflow.DataFlowKernelLoader.load`. Default is None.
    cache : bool
        Enable caching of the app call. Default is False.
    ignore_for_cache : (sequence|None)
        Names of arguments which will be ignored by the caching mechanism.
    """
    from parsl.app.python import PythonApp

    def decorator(func: Callable) -> Callable:
        def wrapper(f: Callable) -> PythonApp:
            return PythonApp(f,
                             data_flow_kernel=data_flow_kernel,
                             cache=cache,
                             executors=["_parsl_internal"],
                             ignore_for_cache=ignore_for_cache,
                             join=True)
        return wrapper(func)
    if function is not None:
        return decorator(function)
    return decorator


@typeguard.typechecked
def bash_app(function: Optional[Callable] = None,
             data_flow_kernel: Optional[DataFlowKernel] = None,
             cache: bool = False,
             executors: Union[List[str], Literal['all']] = 'all',
             ignore_for_cache: Optional[Sequence[str]] = None) -> Callable:
    """Decorator function for making bash apps.

    Parameters
    ----------
    function : function
        Do not pass this keyword argument directly. This is needed in order to allow for omitted parenthesis,
        for example, ``@bash_app`` if using all defaults or ``@bash_app(walltime=120)``. If the
        decorator is used alone, function will be the actual function being decorated, whereas if it
        is called with arguments, function will be None. Default is None.
    data_flow_kernel : DataFlowKernel
        The :class:`~parsl.dataflow.dflow.DataFlowKernel` responsible for managing this app. This can
        be omitted only after calling :meth:`parsl.dataflow.dflow.DataFlowKernelLoader.load`. Default is None.
    walltime : int
        Walltime for app in seconds. Default is 60.
    executors : string or list
        Labels of the executors that this app can execute over. Default is 'all'.
    cache : bool
        Enable caching of the app call. Default is False.
    ignore_for_cache : (list|None)
        Names of arguments which will be ignored by the caching mechanism.
    """
    from parsl.app.bash import BashApp

    def decorator(func: Callable) -> Callable:
        def wrapper(f: Callable) -> BashApp:
            return BashApp(f,
                           data_flow_kernel=data_flow_kernel,
                           cache=cache,
                           executors=executors,
                           ignore_for_cache=ignore_for_cache)
        return wrapper(func)
    if function is not None:
        return decorator(function)
    return decorator