File: data_manager.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (168 lines) | stat: -rw-r--r-- 7,319 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import logging
from concurrent.futures import Future
from typing import TYPE_CHECKING, Any, Callable, List, Optional

from parsl.app.futures import DataFuture
from parsl.data_provider.file_noop import NoOpFileStaging
from parsl.data_provider.files import File
from parsl.data_provider.ftp import FTPSeparateTaskStaging
from parsl.data_provider.http import HTTPSeparateTaskStaging
from parsl.data_provider.staging import Staging
from parsl.data_provider.zip import ZipFileStaging

if TYPE_CHECKING:
    from parsl.dataflow.dflow import DataFlowKernel

logger = logging.getLogger(__name__)

# these will be shared between all executors that do not explicitly
# override, so should not contain executor-specific state
default_staging: List[Staging]
default_staging = [NoOpFileStaging(), FTPSeparateTaskStaging(), HTTPSeparateTaskStaging(), ZipFileStaging()]


class DataManager:
    """The DataManager is responsible for transferring input and output data.

    """

    def __init__(self, dfk: "DataFlowKernel") -> None:
        """Initialize the DataManager.

        Args:
           - dfk (DataFlowKernel): The DataFlowKernel that this DataManager is managing data for.

        """

        self.dfk = dfk

    def replace_task_stage_out(self, file: File, func: Callable, executor: str) -> Callable:
        """This will give staging providers the chance to wrap (or replace entirely!) the task function."""
        executor_obj = self.dfk.executors[executor]
        if hasattr(executor_obj, "storage_access") and executor_obj.storage_access is not None:
            storage_access = executor_obj.storage_access  # type: List[Staging]
        else:
            storage_access = default_staging

        for provider in storage_access:
            logger.debug("stage_out checking Staging provider {}".format(provider))
            if provider.can_stage_out(file):
                newfunc = provider.replace_task_stage_out(self, executor, file, func)
                if newfunc:
                    return newfunc
                else:
                    return func

        logger.debug("reached end of staging provider list")
        # if we reach here, we haven't found a suitable staging mechanism
        raise ValueError("Executor {} cannot stage file {}".format(executor, repr(file)))

    def optionally_stage_in(self, input, func, executor):
        if isinstance(input, DataFuture):
            file = input.file_obj.cleancopy()
            # replace the input DataFuture with a new DataFuture which will complete at
            # the same time as the original one, but will contain the newly
            # copied file
            input = DataFuture(input, file, tid=input.tid)
        elif isinstance(input, File):
            file = input.cleancopy()
            input = file
        else:
            return (input, func)

        replacement_input = self.stage_in(file, input, executor)

        func = self.replace_task(file, func, executor)

        return (replacement_input, func)

    def replace_task(self, file: File, func: Callable, executor: str) -> Callable:
        """This will give staging providers the chance to wrap (or replace entirely!) the task function."""

        executor_obj = self.dfk.executors[executor]
        if hasattr(executor_obj, "storage_access") and executor_obj.storage_access is not None:
            storage_access = executor_obj.storage_access
        else:
            storage_access = default_staging

        for provider in storage_access:
            logger.debug("stage_in checking Staging provider {}".format(provider))
            if provider.can_stage_in(file):
                newfunc = provider.replace_task(self, executor, file, func)
                if newfunc:
                    return newfunc
                else:
                    return func

        logger.debug("reached end of staging provider list")
        # if we reach here, we haven't found a suitable staging mechanism
        raise ValueError("Executor {} cannot stage file {}".format(executor, repr(file)))

    def stage_in(self, file: File, input: Any, executor: str) -> Any:
        """Transport the input from the input source to the executor, if it is file-like,
        returning a DataFuture that wraps the stage-in operation.

        If no staging in is required - because the ``file`` parameter is not file-like,
        then return that parameter unaltered.

        Args:
            - self
            - input (Any) : input to stage in. If this is a File or a
              DataFuture, stage in tasks will be launched with appropriate
              dependencies. Otherwise, no stage-in will be performed.
            - executor (str) : an executor the file is going to be staged in to.
        """

        if isinstance(input, DataFuture):
            parent_fut = input  # type: Optional[Future]
        elif isinstance(input, File):
            parent_fut = None
        else:
            raise ValueError("Internal consistency error - should have checked DataFuture/File earlier")

        executor_obj = self.dfk.executors[executor]
        if hasattr(executor_obj, "storage_access") and executor_obj.storage_access is not None:
            storage_access = executor_obj.storage_access
        else:
            storage_access = default_staging

        for provider in storage_access:
            logger.debug("stage_in checking Staging provider {}".format(provider))
            if provider.can_stage_in(file):
                staging_fut = provider.stage_in(self, executor, file, parent_fut=parent_fut)
                if staging_fut:
                    return staging_fut
                else:
                    return input

        logger.debug("reached end of staging provider list")
        # if we reach here, we haven't found a suitable staging mechanism
        raise ValueError("Executor {} cannot stage file {}".format(executor, repr(file)))

    def stage_out(self, file: File, executor: str, app_fu: Future) -> Optional[Future]:
        """Transport the file from the local filesystem to the remote Globus endpoint.

        This function returns either a Future which should complete when the stageout
        is complete, or None, if no staging needs to be waited for.

        Args:
            - self
            - file (File) - file to stage out
            - executor (str) - Which executor the file is going to be staged out from.
            - app_fu (Future) - a future representing the main body of the task that should
                                complete before stageout begins.
        """
        executor_obj = self.dfk.executors[executor]
        if hasattr(executor_obj, "storage_access") and executor_obj.storage_access is not None:
            storage_access = executor_obj.storage_access
        else:
            storage_access = default_staging

        for provider in storage_access:
            logger.debug("stage_out checking Staging provider {}".format(provider))
            if provider.can_stage_out(file):
                return provider.stage_out(self, executor, file, app_fu)

        logger.debug("reached end of staging provider list")
        # if we reach here, we haven't found a suitable staging mechanism
        raise ValueError("Executor {} cannot stage out file {}".format(executor, repr(file)))