File: globus.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (309 lines) | stat: -rw-r--r-- 12,013 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
import json
import logging
import os
from functools import partial
from typing import Optional

import globus_sdk
import typeguard

import parsl
from parsl.app.app import python_app
from parsl.data_provider.staging import Staging
from parsl.utils import RepresentationMixin

logger = logging.getLogger(__name__)

# globus staging must be run explicitly in the same process/interpreter
# as the DFK. it relies on persisting globus state between actions in that
# process.

"""
'Parsl Application' OAuth2 client registered with Globus Auth
by lukasz@globusid.org
"""
CLIENT_ID = '8b8060fd-610e-4a74-885e-1051c71ad473'
REDIRECT_URI = 'https://auth.globus.org/v2/web/auth-code'
SCOPES = ('openid '
          'urn:globus:auth:scope:transfer.api.globus.org:all')


get_input = getattr(__builtins__, 'raw_input', input)


def _get_globus_provider(dfk, executor_label):
    if executor_label is None:
        raise ValueError("executor_label is mandatory")
    executor = dfk.executors[executor_label]
    if not hasattr(executor, "storage_access"):
        raise ValueError("specified executor does not have storage_access attribute")
    for provider in executor.storage_access:
        if isinstance(provider, GlobusStaging):
            return provider

    raise Exception('No suitable Globus endpoint defined for executor {}'.format(executor_label))


def get_globus():
    Globus.init()
    return Globus()


class Globus:
    """
    All communication with the Globus Auth and Globus Transfer services is enclosed
    in the Globus class. In particular, the Globus class is responsible for:
     - managing an OAuth2 authorizer - getting access and refresh tokens,
       refreshing an access token, storing to and retrieving tokens from
       .globus.json file,
     - submitting file transfers,
     - monitoring transfers.
    """

    authorizer = None

    @classmethod
    def init(cls):
        token_path = os.path.join(os.path.expanduser('~'), '.parsl')
        if not os.path.isdir(token_path):
            os.mkdir(token_path)
        cls.TOKEN_FILE = os.path.join(token_path, '.globus.json')

        if cls.authorizer:
            return
        cls.authorizer = cls._get_native_app_authorizer(CLIENT_ID)

    @classmethod
    def get_authorizer(cls):
        return cls.authorizer

    @classmethod
    def transfer_file(cls, src_ep, dst_ep, src_path, dst_path):
        tc = globus_sdk.TransferClient(authorizer=cls.authorizer)
        td = globus_sdk.TransferData(tc, src_ep, dst_ep)
        td.add_item(src_path, dst_path)
        try:
            task = tc.submit_transfer(td)
        except Exception as e:
            raise Exception('Globus transfer from {}{} to {}{} failed due to error: {}'.format(
                src_ep, src_path, dst_ep, dst_path, e))

        last_event_time = None
        """
        A Globus transfer job (task) can be in one of the three states: ACTIVE, SUCCEEDED, FAILED.
        Parsl every 15 seconds polls a status of the transfer job (task) from the Globus Transfer service,
        with 60 second timeout limit. If the task is ACTIVE after time runs out 'task_wait' returns False,
        and True otherwise.
        """
        while not tc.task_wait(task['task_id'], timeout=60):
            task = tc.get_task(task['task_id'])
            # Get the last error Globus event
            task_id = task['task_id']
            for event in tc.task_event_list(task_id):
                if event['time'] != last_event_time:
                    last_event_time = event['time']
                    logger.warning(
                        'Non-critical Globus Transfer error event for globus://{}{}: "{}" at {}. Retrying...'.format(
                            src_ep, src_path, event['description'], event['time']))
                    logger.debug('Globus Transfer error details: {}'.format(event['details']))
        """
        The Globus transfer job (task) has been terminated (is not ACTIVE). Check if the transfer
        SUCCEEDED or FAILED.
        """
        task = tc.get_task(task['task_id'])
        if task['status'] == 'SUCCEEDED':
            logger.debug('Globus transfer {}, from {}{} to {}{} succeeded'.format(
                task['task_id'], src_ep, src_path, dst_ep, dst_path))
        else:
            logger.debug('Globus Transfer task: {}'.format(task))
            events = tc.task_event_list(task['task_id'])
            event = events.data[0]
            raise Exception('Globus transfer {}, from {}{} to {}{} failed due to error: "{}"'.format(
                task['task_id'], src_ep, src_path, dst_ep, dst_path, event['details']))

    @classmethod
    def _load_tokens_from_file(cls, filepath):
        with open(filepath, 'r') as f:
            tokens = json.load(f)
        return tokens

    @classmethod
    def _save_tokens_to_file(cls, filepath, tokens):
        with open(filepath, 'w') as f:
            json.dump(tokens, f)

    @classmethod
    def _update_tokens_file_on_refresh(cls, token_response):
        cls._save_tokens_to_file(cls.TOKEN_FILE, token_response.by_resource_server)

    @classmethod
    def _do_native_app_authentication(cls, client_id, redirect_uri,
                                      requested_scopes=None):

        client = globus_sdk.NativeAppAuthClient(client_id=client_id)
        client.oauth2_start_flow(
            requested_scopes=requested_scopes,
            redirect_uri=redirect_uri,
            refresh_tokens=True)

        url = client.oauth2_get_authorize_url()
        print('Please visit the following URL to provide authorization: \n{}'.format(url))
        auth_code = get_input('Enter the auth code: ').strip()
        token_response = client.oauth2_exchange_code_for_tokens(auth_code)
        return token_response.by_resource_server

    @classmethod
    def _get_native_app_authorizer(cls, client_id):
        tokens = None
        try:
            tokens = cls._load_tokens_from_file(cls.TOKEN_FILE)
        except Exception:
            pass

        if not tokens:
            tokens = cls._do_native_app_authentication(
                client_id=client_id,
                redirect_uri=REDIRECT_URI,
                requested_scopes=SCOPES)
            try:
                cls._save_tokens_to_file(cls.TOKEN_FILE, tokens)
            except Exception:
                pass

        transfer_tokens = tokens['transfer.api.globus.org']

        auth_client = globus_sdk.NativeAppAuthClient(client_id=client_id)

        return globus_sdk.RefreshTokenAuthorizer(
            transfer_tokens['refresh_token'],
            auth_client,
            access_token=transfer_tokens['access_token'],
            expires_at=transfer_tokens['expires_at_seconds'],
            on_refresh=cls._update_tokens_file_on_refresh)


class GlobusStaging(Staging, RepresentationMixin):
    """Specification for accessing data on a remote executor via Globus.

    Parameters
    ----------
    endpoint_uuid : str
        Universally unique identifier of the Globus endpoint at which the data can be accessed.
        This can be found in the `Manage Endpoints <https://www.globus.org/app/endpoints>`_ page.
    endpoint_path : str, optional
        FIXME
    local_path : str, optional
        FIXME
    """

    def can_stage_in(self, file):
        logger.debug("Globus checking file {}".format(repr(file)))
        return file.scheme == 'globus'

    def can_stage_out(self, file):
        logger.debug("Globus checking file {}".format(repr(file)))
        return file.scheme == 'globus'

    def stage_in(self, dm, executor, file, parent_fut):
        globus_provider = _get_globus_provider(dm.dfk, executor)
        globus_provider._update_local_path(file, executor, dm.dfk)
        stage_in_app = globus_provider._globus_stage_in_app(executor=executor, dfk=dm.dfk)
        app_fut = stage_in_app(outputs=[file], _parsl_staging_inhibit=True, parent_fut=parent_fut)
        return app_fut._outputs[0]

    def stage_out(self, dm, executor, file, app_fu):
        globus_provider = _get_globus_provider(dm.dfk, executor)
        globus_provider._update_local_path(file, executor, dm.dfk)
        stage_out_app = globus_provider._globus_stage_out_app(executor=executor, dfk=dm.dfk)
        return stage_out_app(app_fu, _parsl_staging_inhibit=True, inputs=[file])

    @typeguard.typechecked
    def __init__(self, endpoint_uuid: str, endpoint_path: Optional[str] = None, local_path: Optional[str] = None):
        self.endpoint_uuid = endpoint_uuid
        self.endpoint_path = endpoint_path
        self.local_path = local_path
        self.globus = None

    def _globus_stage_in_app(self, executor, dfk):
        executor_obj = dfk.executors[executor]
        f = partial(_globus_stage_in, self, executor_obj)
        return python_app(executors=['_parsl_internal'], data_flow_kernel=dfk)(f)

    def _globus_stage_out_app(self, executor, dfk):
        executor_obj = dfk.executors[executor]
        f = partial(_globus_stage_out, self, executor_obj)
        return python_app(executors=['_parsl_internal'], data_flow_kernel=dfk)(f)

    # could this happen at __init__ time?
    def initialize_globus(self):
        if self.globus is None:
            self.globus = get_globus()

    def _get_globus_endpoint(self, executor):
        if executor.working_dir:
            working_dir = os.path.normpath(executor.working_dir)
        else:
            raise ValueError("executor working_dir must be specified for GlobusStaging")
        if self.endpoint_path and self.local_path:
            endpoint_path = os.path.normpath(self.endpoint_path)
            local_path = os.path.normpath(self.local_path)
            common_path = os.path.commonpath((local_path, working_dir))
            if local_path != common_path:
                raise Exception('"local_path" must be equal or an absolute subpath of "working_dir"')
            relative_path = os.path.relpath(working_dir, common_path)
            endpoint_path = os.path.join(endpoint_path, relative_path)
        else:
            endpoint_path = working_dir
        return {'endpoint_uuid': self.endpoint_uuid,
                'endpoint_path': endpoint_path,
                'working_dir': working_dir}

    def _update_local_path(self, file, executor, dfk):
        executor_obj = dfk.executors[executor]
        globus_ep = self._get_globus_endpoint(executor_obj)
        file.local_path = os.path.join(globus_ep['working_dir'], file.filename)


# this cannot be a class method, but must be a function, because I want
# to be able to use partial() on it - and partial() does not work on
# class methods
def _globus_stage_in(provider, executor, parent_fut=None, outputs=[], _parsl_staging_inhibit=True):
    globus_ep = provider._get_globus_endpoint(executor)
    file = outputs[0]
    dst_path = os.path.join(
            globus_ep['endpoint_path'], file.filename)

    provider.initialize_globus()

    provider.globus.transfer_file(
            file.netloc, globus_ep['endpoint_uuid'],
            file.path, dst_path)


def _globus_stage_out(provider, executor, app_fu, inputs=[], _parsl_staging_inhibit=True):
    """
    Although app_fu isn't directly used in the stage out code,
    it is needed as an input dependency to ensure this code
    doesn't run until the app_fu is complete. The actual change
    that is represented by the app_fu completing is that the
    executor filesystem will now contain the file to stage out.
    """
    globus_ep = provider._get_globus_endpoint(executor)
    file = inputs[0]
    src_path = os.path.join(globus_ep['endpoint_path'], file.filename)

    provider.initialize_globus()

    provider.globus.transfer_file(
        globus_ep['endpoint_uuid'], file.netloc,
        src_path, file.path
    )


def cli_run():
    parsl.set_stream_logger()
    print("Parsl Globus command-line authorizer")
    print("If authorization to Globus is necessary, the library will prompt you now.")
    print("Otherwise it will do nothing")
    get_globus()
    print("Authorization complete")