1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
|
import contextlib
import os
import pathlib
import shutil
import stat
import sys
import zipfile
__all__ = ['ZipAppError', 'create_archive', 'get_interpreter']
# The __main__.py used if the users specifies "-m module:fn".
# Note that this will always be written as UTF-8 (module and
# function names can be non-ASCII in Python 3).
# We add a coding cookie even though UTF-8 is the default in Python 3
# because the resulting archive may be intended to be run under Python 2.
MAIN_TEMPLATE = """\
# -*- coding: utf-8 -*-
import {module}
{module}.{fn}()
"""
# The Windows launcher defaults to UTF-8 when parsing shebang lines if the
# file has no BOM. So use UTF-8 on Windows.
# On Unix, use the filesystem encoding.
if sys.platform.startswith('win'):
shebang_encoding = 'utf-8'
else:
shebang_encoding = sys.getfilesystemencoding()
class ZipAppError(ValueError):
pass
@contextlib.contextmanager
def _maybe_open(archive, mode):
if isinstance(archive, (str, os.PathLike)):
with open(archive, mode) as f:
yield f
else:
yield archive
def _write_file_prefix(f, interpreter):
"""Write a shebang line."""
if interpreter:
shebang = b'#!' + interpreter.encode(shebang_encoding) + b'\n'
f.write(shebang)
def _copy_archive(archive, new_archive, interpreter=None):
"""Copy an application archive, modifying the shebang line."""
with _maybe_open(archive, 'rb') as src:
# Skip the shebang line from the source.
# Read 2 bytes of the source and check if they are #!.
first_2 = src.read(2)
if first_2 == b'#!':
# Discard the initial 2 bytes and the rest of the shebang line.
first_2 = b''
src.readline()
with _maybe_open(new_archive, 'wb') as dst:
_write_file_prefix(dst, interpreter)
# If there was no shebang, "first_2" contains the first 2 bytes
# of the source file, so write them before copying the rest
# of the file.
dst.write(first_2)
shutil.copyfileobj(src, dst)
if interpreter and isinstance(new_archive, str):
os.chmod(new_archive, os.stat(new_archive).st_mode | stat.S_IEXEC)
def create_archive(source, target=None, interpreter=None, main=None,
filter=None, compressed=False):
"""Create an application archive from SOURCE.
The SOURCE can be the name of a directory, or a filename or a file-like
object referring to an existing archive.
The content of SOURCE is packed into an application archive in TARGET,
which can be a filename or a file-like object. If SOURCE is a directory,
TARGET can be omitted and will default to the name of SOURCE with .pyz
appended.
The created application archive will have a shebang line specifying
that it should run with INTERPRETER (there will be no shebang line if
INTERPRETER is None), and a __main__.py which runs MAIN (if MAIN is
not specified, an existing __main__.py will be used). It is an error
to specify MAIN for anything other than a directory source with no
__main__.py, and it is an error to omit MAIN if the directory has no
__main__.py.
"""
# Are we copying an existing archive?
source_is_file = False
if hasattr(source, 'read') and hasattr(source, 'readline'):
source_is_file = True
else:
source = pathlib.Path(source)
if source.is_file():
source_is_file = True
if source_is_file:
_copy_archive(source, target, interpreter)
return
# We are creating a new archive from a directory.
if not source.exists():
raise ZipAppError("Source does not exist")
has_main = (source / '__main__.py').is_file()
if main and has_main:
raise ZipAppError(
"Cannot specify entry point if the source has __main__.py")
if not (main or has_main):
raise ZipAppError("Archive has no entry point")
main_py = None
if main:
# Check that main has the right format.
mod, sep, fn = main.partition(':')
mod_ok = all(part.isidentifier() for part in mod.split('.'))
fn_ok = all(part.isidentifier() for part in fn.split('.'))
if not (sep == ':' and mod_ok and fn_ok):
raise ZipAppError("Invalid entry point: " + main)
main_py = MAIN_TEMPLATE.format(module=mod, fn=fn)
if target is None:
target = source.with_suffix('.pyz')
elif not hasattr(target, 'write'):
target = pathlib.Path(target)
# Create the list of files to add to the archive now, in case
# the target is being created in the source directory - we
# don't want the target being added to itself
files_to_add = sorted(source.rglob('*'))
# The target cannot be in the list of files to add. If it were, we'd
# end up overwriting the source file and writing the archive into
# itself, which is an error. We therefore check for that case and
# provide a helpful message for the user.
# Note that we only do a simple path equality check. This won't
# catch every case, but it will catch the common case where the
# source is the CWD and the target is a file in the CWD. More
# thorough checks don't provide enough value to justify the extra
# cost.
# If target is a file-like object, it will simply fail to compare
# equal to any of the entries in files_to_add, so there's no need
# to add a special check for that.
if target in files_to_add:
raise ZipAppError(
f"The target archive {target} overwrites one of the source files.")
with _maybe_open(target, 'wb') as fd:
_write_file_prefix(fd, interpreter)
compression = (zipfile.ZIP_DEFLATED if compressed else
zipfile.ZIP_STORED)
with zipfile.ZipFile(fd, 'w', compression=compression) as z:
for child in files_to_add:
arcname = child.relative_to(source)
if filter is None or filter(arcname):
z.write(child, arcname.as_posix())
if main_py:
z.writestr('__main__.py', main_py.encode('utf-8'))
if interpreter and not hasattr(target, 'write'):
target.chmod(target.stat().st_mode | stat.S_IEXEC)
def get_interpreter(archive):
with _maybe_open(archive, 'rb') as f:
if f.read(2) == b'#!':
return f.readline().strip().decode(shebang_encoding)
def main(args=None):
"""Run the zipapp command line interface.
The ARGS parameter lets you specify the argument list directly.
Omitting ARGS (or setting it to None) works as for argparse, using
sys.argv[1:] as the argument list.
"""
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--output', '-o', default=None,
help="The name of the output archive. "
"Required if SOURCE is an archive.")
parser.add_argument('--python', '-p', default=None,
help="The name of the Python interpreter to use "
"(default: no shebang line).")
parser.add_argument('--main', '-m', default=None,
help="The main function of the application "
"(default: use an existing __main__.py).")
parser.add_argument('--compress', '-c', action='store_true',
help="Compress files with the deflate method. "
"Files are stored uncompressed by default.")
parser.add_argument('--info', default=False, action='store_true',
help="Display the interpreter from the archive.")
parser.add_argument('source',
help="Source directory (or existing archive).")
args = parser.parse_args(args)
# Handle `python -m zipapp archive.pyz --info`.
if args.info:
if not os.path.isfile(args.source):
raise SystemExit("Can only get info for an archive file")
interpreter = get_interpreter(args.source)
print("Interpreter: {}".format(interpreter or "<none>"))
sys.exit(0)
if os.path.isfile(args.source):
if args.output is None or (os.path.exists(args.output) and
os.path.samefile(args.source, args.output)):
raise SystemExit("In-place editing of archives is not supported")
if args.main:
raise SystemExit("Cannot change the main function when copying")
create_archive(args.source, args.output,
interpreter=args.python, main=args.main,
compressed=args.compress)
if __name__ == '__main__':
main()
|