File: execute_task.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (37 lines) | stat: -rw-r--r-- 1,079 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import os

from parsl.serialize import unpack_res_spec_apply_message


def execute_task(bufs: bytes):
    """Deserialize the buffer and execute the task.
    Returns the result or throws exception.
    """
    f, args, kwargs, resource_spec = unpack_res_spec_apply_message(bufs)

    for varname in resource_spec:
        envname = "PARSL_" + str(varname).upper()
        os.environ[envname] = str(resource_spec[varname])

    # We might need to look into callability of the function from itself
    # since we change it's name in the new namespace
    prefix = "parsl_"
    fname = prefix + "f"
    argname = prefix + "args"
    kwargname = prefix + "kwargs"
    resultname = prefix + "result"

    code = "{0} = {1}(*{2}, **{3})".format(resultname, fname,
                                           argname, kwargname)

    user_ns = locals()
    user_ns.update({
        '__builtins__': __builtins__,
        fname: f,
        argname: args,
        kwargname: kwargs,
        resultname: resultname
    })

    exec(code, user_ns, user_ns)
    return user_ns.get(resultname)