File: python.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (84 lines) | stat: -rw-r--r-- 2,331 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import logging
import threading

import tblib.pickling_support

tblib.pickling_support.install()

from functools import wraps

from parsl.app.app import AppBase
from parsl.app.errors import wrap_error
from parsl.dataflow.dflow import DataFlowKernelLoader
from parsl.utils import AutoCancelTimer

logger = logging.getLogger(__name__)


def timeout(f, seconds: float):
    @wraps(f)
    def wrapper(*args, **kwargs):
        import ctypes

        import parsl.app.errors

        def inject_exception(thread):
            ctypes.pythonapi.PyThreadState_SetAsyncExc(
                ctypes.c_long(thread),
                ctypes.py_object(parsl.app.errors.AppTimeout)
            )

        thread = threading.current_thread().ident
        with AutoCancelTimer(seconds, inject_exception, args=[thread]):
            return f(*args, **kwargs)
    return wrapper


class PythonApp(AppBase):
    """Extends AppBase to cover the Python App."""

    def __init__(self, func, data_flow_kernel=None, cache=False, executors='all', ignore_for_cache=None, join=False):
        super().__init__(
            wrap_error(func),
            data_flow_kernel=data_flow_kernel,
            executors=executors,
            cache=cache,
            ignore_for_cache=ignore_for_cache
        )
        self.join = join

    def __call__(self, *args, **kwargs):
        """This is where the call to a python app is handled.

        Args:
             - Arbitrary
        Kwargs:
             - Arbitrary

        Returns:
                   App_fut

        """
        invocation_kwargs = {}
        invocation_kwargs.update(self.kwargs)
        invocation_kwargs.update(kwargs)

        if self.data_flow_kernel is None:
            dfk = DataFlowKernelLoader.dfk()
        else:
            dfk = self.data_flow_kernel

        walltime = invocation_kwargs.get('walltime')
        if walltime is not None:
            func = timeout(self.func, walltime)
        else:
            func = self.func

        app_fut = dfk.submit(func, app_args=args,
                             executors=self.executors,
                             cache=self.cache,
                             ignore_for_cache=self.ignore_for_cache,
                             app_kwargs=invocation_kwargs,
                             join=self.join)

        return app_fut