1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
import logging
import os
from parsl.data_provider.staging import Staging
from parsl.utils import RepresentationMixin
logger = logging.getLogger(__name__)
class RSyncStaging(Staging, RepresentationMixin):
"""
This staging provider will execute rsync on worker nodes
to stage in files from a remote location.
Worker nodes must be able to authenticate to the rsync server
without interactive authentication - for example, worker
initialization could include an appropriate SSH key configuration.
The submit side will need to run an rsync-compatible server (for example,
an ssh server with the rsync binary installed)
"""
def __init__(self, hostname):
self.hostname = hostname
def can_stage_in(self, file):
return file.scheme == "file"
def can_stage_out(self, file):
return file.scheme == "file"
def stage_in(self, dm, executor, file, parent_fut):
# we need to make path an absolute path, because
# rsync remote name needs to include absolute path
file.path = os.path.abspath(file.path)
working_dir = dm.dfk.executors[executor].working_dir
if working_dir:
file.local_path = os.path.join(working_dir, file.filename)
else:
file.local_path = file.filename
return None
def stage_out(self, dm, executor, file, parent_fut):
file.path = os.path.abspath(file.path)
working_dir = dm.dfk.executors[executor].working_dir
if working_dir:
file.local_path = os.path.join(working_dir, file.filename)
else:
file.local_path = file.filename
return None
def replace_task(self, dm, executor, file, f):
logger.debug("Replacing task for rsync stagein")
working_dir = dm.dfk.executors[executor].working_dir
return in_task_stage_in_wrapper(f, file, working_dir, self.hostname)
def replace_task_stage_out(self, dm, executor, file, f):
logger.debug("Replacing task for rsync stageout")
working_dir = dm.dfk.executors[executor].working_dir
return in_task_stage_out_wrapper(f, file, working_dir, self.hostname)
def in_task_stage_in_wrapper(func, file, working_dir, hostname):
def wrapper(*args, **kwargs):
import logging
logger = logging.getLogger(__name__)
logger.debug("rsync in_task_stage_in_wrapper start")
if working_dir:
os.makedirs(working_dir, exist_ok=True)
logger.debug("rsync in_task_stage_in_wrapper calling rsync")
r = os.system("rsync {hostname}:{permanent_filepath} {worker_filepath}".format(hostname=hostname,
permanent_filepath=file.path,
worker_filepath=file.local_path))
if r != 0:
raise RuntimeError("rsync returned {}, a {}".format(r, type(r)))
logger.debug("rsync in_task_stage_in_wrapper calling wrapped function")
result = func(*args, **kwargs)
logger.debug("rsync in_task_stage_in_wrapper returned from wrapped function")
return result
return wrapper
def in_task_stage_out_wrapper(func, file, working_dir, hostname):
def wrapper(*args, **kwargs):
import logging
logger = logging.getLogger(__name__)
logger.debug("rsync in_task_stage_out_wrapper start")
logger.debug("rsync in_task_stage_out_wrapper calling wrapped function")
result = func(*args, **kwargs)
logger.debug("rsync in_task_stage_out_wrapper returned from wrapped function, calling rsync")
r = os.system("rsync {worker_filepath} {hostname}:{permanent_filepath}".format(hostname=hostname,
permanent_filepath=file.path,
worker_filepath=file.local_path))
if r != 0:
raise RuntimeError("rsync returned {}, a {}".format(r, type(r)))
logger.debug("rsync in_task_stage_out_wrapper returned from rsync")
return result
return wrapper
|