1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
import logging
import os
import zipfile
from typing import Tuple
import filelock
import parsl
from parsl.data_provider.files import File
from parsl.data_provider.staging import Staging
from parsl.errors import ParslError
logger = logging.getLogger(__name__)
class ZipAuthorityError(ParslError):
def __init__(self, file):
self.file = file
def __str__(self):
return f"ZipFileStaging cannot stage Files with an authority (netloc) section ({self.file.netloc}), for {self.file.url}"
class ZipFileStaging(Staging):
"""A stage-out provider for zip files.
This provider will stage out files by writing them into the specified zip
file.
The filename of both the zip file and the file contained in that zip are
specified using a zip: URL, like this:
zip:/tmp/foo/this.zip/inside/here.txt
This URL names a zip file ``/tmp/foo/this.zip`` containing a file
``inside/here.txt``.
The provider will use the Python filelock package to lock the zip file so
that it does not conflict with other instances of itself. This lock will
not protect against other modifications to the zip file.
"""
def can_stage_out(self, file: File) -> bool:
return self.is_zip_url(file)
def can_stage_in(self, file: File) -> bool:
return self.is_zip_url(file)
def is_zip_url(self, file: File) -> bool:
logger.debug("archive provider checking File {}".format(repr(file)))
# First check if this is the scheme we care about
if file.scheme != "zip":
return False
# This is some basic validation to check that the user isn't specifying
# an authority section and expecting it to mean something.
if file.netloc != "":
raise ZipAuthorityError(file)
# If we got this far, we can stage this file
return True
def stage_out(self, dm, executor, file, parent_fut):
assert file.scheme == 'zip'
zip_path, inside_path = zip_path_split(file.path)
working_dir = dm.dfk.executors[executor].working_dir
if working_dir:
file.local_path = os.path.join(working_dir, inside_path)
# TODO: I think its the right behaviour that a staging out provider should create the directory structure
# for the file to be placed in?
os.makedirs(os.path.dirname(file.local_path), exist_ok=True)
else:
raise RuntimeError("zip file staging requires a working_dir to be specified")
stage_out_app = _zip_stage_out_app(dm)
app_fut = stage_out_app(zip_path, inside_path, working_dir, inputs=[file], _parsl_staging_inhibit=True, parent_fut=parent_fut)
return app_fut
def stage_in(self, dm, executor, file, parent_fut):
assert file.scheme == 'zip'
zip_path, inside_path = zip_path_split(file.path)
working_dir = dm.dfk.executors[executor].working_dir
if working_dir:
file.local_path = os.path.join(working_dir, inside_path)
stage_in_app = _zip_stage_in_app(dm)
app_fut = stage_in_app(zip_path, inside_path, working_dir, outputs=[file], _parsl_staging_inhibit=True, parent_fut=parent_fut)
return app_fut._outputs[0]
def _zip_stage_out(zip_file, inside_path, working_dir, parent_fut=None, inputs=[], _parsl_staging_inhibit=True):
file = inputs[0]
os.makedirs(os.path.dirname(zip_file), exist_ok=True)
with filelock.FileLock(zip_file + ".lock"):
with zipfile.ZipFile(zip_file, mode='a', compression=zipfile.ZIP_DEFLATED) as z:
z.write(file, arcname=inside_path)
os.remove(file)
def _zip_stage_out_app(dm):
return parsl.python_app(executors=['_parsl_internal'], data_flow_kernel=dm.dfk)(_zip_stage_out)
def _zip_stage_in(zip_file, inside_path, working_dir, *, parent_fut, outputs, _parsl_staging_inhibit=True):
with filelock.FileLock(zip_file + ".lock"):
with zipfile.ZipFile(zip_file, mode='r') as z:
content = z.read(inside_path)
with open(outputs[0], "wb") as of:
of.write(content)
def _zip_stage_in_app(dm):
return parsl.python_app(executors=['_parsl_internal'], data_flow_kernel=dm.dfk)(_zip_stage_in)
def zip_path_split(path: str) -> Tuple[str, str]:
"""Split zip: path into a zipfile name and a contained-file name.
"""
index = path.find(".zip/")
zip_path = path[:index + 4]
inside_path = path[index + 5:]
return (zip_path, inside_path)
|