1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
|
"""Functions to compress the contents of a filesystem.
Currently zip and tar are supported, using the `zipfile` and
`tarfile` modules from the standard library.
"""
from __future__ import absolute_import, print_function, unicode_literals
import typing
import tarfile
import time
import zipfile
from datetime import datetime
from .enums import ResourceType
from .errors import MissingInfoNamespace, NoSysPath
from .path import relpath
from .time import datetime_to_epoch
from .walk import Walker
if typing.TYPE_CHECKING:
from typing import BinaryIO, Optional, Text, Tuple, Union
from .base import FS
ZipTime = Tuple[int, int, int, int, int, int]
def write_zip(
src_fs, # type: FS
file, # type: Union[Text, BinaryIO]
compression=zipfile.ZIP_DEFLATED, # type: int
encoding="utf-8", # type: Text
walker=None, # type: Optional[Walker]
):
# type: (...) -> None
"""Write the contents of a filesystem to a zip file.
Arguments:
src_fs (~fs.base.FS): The source filesystem to compress.
file (str or io.IOBase): Destination file, may be a file name
or an open file object.
compression (int): Compression to use (one of the constants
defined in the `zipfile` module in the stdlib). Defaults
to `zipfile.ZIP_DEFLATED`.
encoding (str): The encoding to use for filenames. The default
is ``"utf-8"``, use ``"CP437"`` if compatibility with WinZip
is desired.
walker (~fs.walk.Walker, optional): A `Walker` instance, or `None`
to use default walker. You can use this to specify which files
you want to compress.
"""
_zip = zipfile.ZipFile(file, mode="w", compression=compression, allowZip64=True)
walker = walker or Walker()
with _zip:
gen_walk = walker.info(src_fs, namespaces=["details", "stat", "access"])
for path, info in gen_walk:
# Zip names must be relative, directory names must end
# with a slash.
zip_name = relpath(path + "/" if info.is_dir else path)
if info.has_namespace("stat"):
# If the file has a stat namespace, get the
# zip time directory from the stat structure
st_mtime = info.get("stat", "st_mtime", None)
_mtime = time.localtime(st_mtime)
zip_time = _mtime[0:6] # type: ZipTime
else:
# Otherwise, use the modified time from details
# namespace.
mt = info.modified or datetime.utcnow()
zip_time = (mt.year, mt.month, mt.day, mt.hour, mt.minute, mt.second)
# NOTE(@althonos): typeshed's `zipfile.py` on declares
# ZipInfo.__init__ for Python < 3 ?!
zip_info = zipfile.ZipInfo(zip_name, zip_time) # type: ignore
try:
if info.permissions is not None:
zip_info.external_attr = info.permissions.mode << 16
except MissingInfoNamespace:
pass
if info.is_dir:
zip_info.external_attr |= 0x10
# This is how to record directories with zipfile
_zip.writestr(zip_info, b"")
else:
# Get a syspath if possible
try:
sys_path = src_fs.getsyspath(path)
except NoSysPath:
# Write from bytes
_zip.writestr(zip_info, src_fs.readbytes(path))
else:
# Write from a file which is (presumably)
# more memory efficient
_zip.write(sys_path, zip_name)
def write_tar(
src_fs, # type: FS
file, # type: Union[Text, BinaryIO]
compression=None, # type: Optional[Text]
encoding="utf-8", # type: Text
walker=None, # type: Optional[Walker]
):
# type: (...) -> None
"""Write the contents of a filesystem to a tar file.
Arguments:
src_fs (~fs.base.FS): The source filesystem to compress.
file (str or io.IOBase): Destination file, may be a file
name or an open file object.
compression (str, optional): Compression to use, or `None`
for a plain Tar archive without compression.
encoding(str): The encoding to use for filenames. The
default is ``"utf-8"``.
walker (~fs.walk.Walker, optional): A `Walker` instance, or
`None` to use default walker. You can use this to specify
which files you want to compress.
"""
type_map = {
ResourceType.block_special_file: tarfile.BLKTYPE,
ResourceType.character: tarfile.CHRTYPE,
ResourceType.directory: tarfile.DIRTYPE,
ResourceType.fifo: tarfile.FIFOTYPE,
ResourceType.file: tarfile.REGTYPE,
ResourceType.socket: tarfile.AREGTYPE, # no type for socket
ResourceType.symlink: tarfile.SYMTYPE,
ResourceType.unknown: tarfile.AREGTYPE, # no type for unknown
}
tar_attr = [("uid", "uid"), ("gid", "gid"), ("uname", "user"), ("gname", "group")]
mode = "w:{}".format(compression or "")
if isinstance(file, (str, bytes)):
_tar = tarfile.open(file, mode=mode)
else:
_tar = tarfile.open(fileobj=file, mode=mode)
current_time = time.time()
walker = walker or Walker()
with _tar:
gen_walk = walker.info(src_fs, namespaces=["details", "stat", "access"])
for path, info in gen_walk:
# Tar names must be relative
tar_name = relpath(path)
tar_info = tarfile.TarInfo(tar_name)
if info.has_namespace("stat"):
mtime = info.get("stat", "st_mtime", current_time)
else:
mtime = info.modified or current_time
if isinstance(mtime, datetime):
mtime = datetime_to_epoch(mtime)
if isinstance(mtime, float):
mtime = int(mtime)
tar_info.mtime = mtime
for tarattr, infoattr in tar_attr:
if getattr(info, infoattr, None) is not None:
setattr(tar_info, tarattr, getattr(info, infoattr, None))
if info.has_namespace("access"):
tar_info.mode = getattr(info.permissions, "mode", 0o420)
if info.is_dir:
tar_info.type = tarfile.DIRTYPE
_tar.addfile(tar_info)
else:
tar_info.type = type_map.get(info.type, tarfile.REGTYPE)
tar_info.size = info.size
with src_fs.openbin(path) as bin_file:
_tar.addfile(tar_info, bin_file)
|