File: rpex_worker.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (62 lines) | stat: -rw-r--r-- 1,830 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import sys

import radical.pilot as rp

import parsl.app.errors as pe
from parsl.app.bash import remote_side_bash_executor
from parsl.executors.execute_task import execute_task
from parsl.serialize import serialize, unpack_res_spec_apply_message


class ParslWorker:

    def _dispatch_func(self, task):

        try:
            buffer = rp.utils.deserialize_bson(task['description']['function'])
            result = execute_task(buffer)
            val = str(serialize(result, buffer_threshold=1000000))
            exc = (None, None)
            ret = 0
            out = None
            err = None
        except Exception:
            val = None
            exc = (rp.utils.serialize_bson(pe.RemoteExceptionWrapper(*sys.exc_info())), None)
            ret = 1
            out = None
            err = None

        return out, err, ret, val, exc

    def _dispatch_proc(self, task):

        try:
            buffer = rp.utils.deserialize_bson(task['description']['executable'])
            func, args, kwargs, _resource_spec = unpack_res_spec_apply_message(buffer)
            ret = remote_side_bash_executor(func, *args, **kwargs)
            exc = (None, None)
            val = None
            out = None
            err = None
        except Exception:
            val = None
            exc = (rp.utils.serialize_bson(pe.RemoteExceptionWrapper(*sys.exc_info())), None)
            ret = 1
            out = None
            err = None

        return out, err, ret, val, exc


class MPIWorker(rp.raptor.MPIWorker):
    def _dispatch_func(self, task):
        return super()._dispatch_func(task)


class DefaultWorker(rp.raptor.DefaultWorker):
    def _dispatch_func(self, task):
        return ParslWorker()._dispatch_func(task)

    def _dispatch_proc(self, task):
        return ParslWorker()._dispatch_proc(task)