File: zip_helpers.py

package info (click to toggle)
chromium 138.0.7204.157-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 6,071,864 kB
  • sloc: cpp: 34,936,859; ansic: 7,176,967; javascript: 4,110,704; python: 1,419,953; asm: 946,768; xml: 739,967; pascal: 187,324; sh: 89,623; perl: 88,663; objc: 79,944; sql: 50,304; cs: 41,786; fortran: 24,137; makefile: 21,806; php: 13,980; tcl: 13,166; yacc: 8,925; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (245 lines) | stat: -rw-r--r-- 8,630 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Helper functions for dealing with .zip files."""

import os
import pathlib
import posixpath
import stat
import time
import zipfile

_FIXED_ZIP_HEADER_LEN = 30


def _set_alignment(zip_obj, zip_info, alignment):
  """Sets a ZipInfo's extra field such that the file will be aligned.

  Args:
    zip_obj: The ZipFile object that is being written.
    zip_info: The ZipInfo object about to be written.
    alignment: The amount of alignment (e.g. 4, or 4*1024).
  """
  header_size = _FIXED_ZIP_HEADER_LEN + len(zip_info.filename)
  pos = zip_obj.fp.tell() + header_size
  padding_needed = (alignment - (pos % alignment)) % alignment

  # Python writes |extra| to both the local file header and the central
  # directory's file header. Android's zipalign tool writes only to the
  # local file header, so there is more overhead in using Python to align.
  zip_info.extra = b'\0' * padding_needed


def _hermetic_date_time(timestamp=None):
  if not timestamp:
    return (2001, 1, 1, 0, 0, 0)
  utc_time = time.gmtime(timestamp)
  return (utc_time.tm_year, utc_time.tm_mon, utc_time.tm_mday, utc_time.tm_hour,
          utc_time.tm_min, utc_time.tm_sec)


def add_to_zip_hermetic(zip_file,
                        zip_path,
                        *,
                        src_path=None,
                        data=None,
                        compress=None,
                        alignment=None,
                        timestamp=None):
  """Adds a file to the given ZipFile with a hard-coded modified time.

  Args:
    zip_file: ZipFile instance to add the file to.
    zip_path: Destination path within the zip file (or ZipInfo instance).
    src_path: Path of the source file. Mutually exclusive with |data|.
    data: File data as a string.
    compress: Whether to enable compression. Default is taken from ZipFile
        constructor.
    alignment: If set, align the data of the entry to this many bytes.
    timestamp: The last modification date and time for the archive member.
  """
  assert (src_path is None) != (data is None), (
      '|src_path| and |data| are mutually exclusive.')
  if isinstance(zip_path, zipfile.ZipInfo):
    zipinfo = zip_path
    zip_path = zipinfo.filename
  else:
    zipinfo = zipfile.ZipInfo(filename=zip_path)
    zipinfo.external_attr = 0o644 << 16

  zipinfo.date_time = _hermetic_date_time(timestamp)

  if alignment:
    _set_alignment(zip_file, zipinfo, alignment)

  # Filenames can contain backslashes, but it is more likely that we've
  # forgotten to use forward slashes as a directory separator.
  assert '\\' not in zip_path, 'zip_path should not contain \\: ' + zip_path
  assert not posixpath.isabs(zip_path), 'Absolute zip path: ' + zip_path
  assert not zip_path.startswith('..'), 'Should not start with ..: ' + zip_path
  assert posixpath.normpath(zip_path) == zip_path, (
      f'Non-canonical zip_path: {zip_path} vs: {posixpath.normpath(zip_path)}')
  assert zip_path not in zip_file.namelist(), (
      'Tried to add a duplicate zip entry: ' + zip_path)

  if src_path and os.path.islink(src_path):
    zipinfo.external_attr |= stat.S_IFLNK << 16  # mark as a symlink
    zip_file.writestr(zipinfo, os.readlink(src_path))
    return

  # Maintain the executable bit.
  if src_path:
    st = os.stat(src_path)
    for mode in (stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH):
      if st.st_mode & mode:
        zipinfo.external_attr |= mode << 16

  if src_path:
    with open(src_path, 'rb') as f:
      data = f.read()

  # zipfile will deflate even when it makes the file bigger. To avoid
  # growing files, disable compression at an arbitrary cut off point.
  if len(data) < 16:
    compress = False

  # None converts to ZIP_STORED, when passed explicitly rather than the
  # default passed to the ZipFile constructor.
  compress_type = zip_file.compression
  if compress is not None:
    compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
  zip_file.writestr(zipinfo, data, compress_type)


def add_files_to_zip(inputs,
                     output,
                     *,
                     base_dir=None,
                     path_transform=None,
                     compress=None,
                     zip_prefix_path=None,
                     timestamp=None):
  """Creates a zip file from a list of files.

  Args:
    inputs: A list of paths to zip, or a list of (zip_path, fs_path) tuples.
    output: Path, fileobj, or ZipFile instance to add files to.
    base_dir: Prefix to strip from inputs.
    path_transform: Called for each entry path. Returns a new zip path, or None
        to skip the file.
    compress: Whether to compress
    zip_prefix_path: Path prepended to file path in zip file.
    timestamp: Unix timestamp to use for files in the archive.
  """
  if base_dir is None:
    base_dir = '.'
  input_tuples = []
  for tup in inputs:
    if isinstance(tup, str):
      src_path = tup
      zip_path = os.path.relpath(src_path, base_dir)
      # Zip files always use / as path separator.
      if os.path.sep != posixpath.sep:
        zip_path = str(pathlib.Path(zip_path).as_posix())
      tup = (zip_path, src_path)
    input_tuples.append(tup)

  # Sort by zip path to ensure stable zip ordering.
  input_tuples.sort(key=lambda tup: tup[0])

  out_zip = output
  if not isinstance(output, zipfile.ZipFile):
    out_zip = zipfile.ZipFile(output, 'w')

  try:
    for zip_path, fs_path in input_tuples:
      if zip_prefix_path:
        zip_path = posixpath.join(zip_prefix_path, zip_path)
      if path_transform:
        zip_path = path_transform(zip_path)
        if zip_path is None:
          continue
      add_to_zip_hermetic(out_zip,
                          zip_path,
                          src_path=fs_path,
                          compress=compress,
                          timestamp=timestamp)
  finally:
    if output is not out_zip:
      out_zip.close()


def zip_directory(output, base_dir, **kwargs):
  """Zips all files in the given directory."""
  inputs = []
  for root, _, files in os.walk(base_dir):
    for f in files:
      inputs.append(os.path.join(root, f))

  add_files_to_zip(inputs, output, base_dir=base_dir, **kwargs)


def merge_zips(output, input_zips, path_transform=None, compress=None):
  """Combines all files from |input_zips| into |output|.

  Args:
    output: Path, fileobj, or ZipFile instance to add files to.
    input_zips: Iterable of paths to zip files to merge.
    path_transform: Called for each entry path. Returns a new zip path, or None
        to skip the file.
    compress: Overrides compression setting from origin zip entries.
  """
  assert not isinstance(input_zips, str)  # Easy mistake to make.
  if isinstance(output, zipfile.ZipFile):
    out_zip = output
    out_filename = output.filename
  else:
    assert isinstance(output, str), 'Was: ' + repr(output)
    out_zip = zipfile.ZipFile(output, 'w')
    out_filename = output

  # Include paths in the existing zip here to avoid adding duplicate files.
  crc_by_name = {i.filename: (out_filename, i.CRC) for i in out_zip.infolist()}

  try:
    for in_file in input_zips:
      with zipfile.ZipFile(in_file, 'r') as in_zip:
        for info in in_zip.infolist():
          # Ignore directories.
          if info.filename[-1] == '/':
            continue
          if path_transform:
            dst_name = path_transform(info.filename)
            if dst_name is None:
              continue
          else:
            dst_name = info.filename

          data = in_zip.read(info)

          # If there's a duplicate file, ensure contents is the same and skip
          # adding it multiple times.
          if dst_name in crc_by_name:
            orig_filename, orig_crc = crc_by_name[dst_name]
            new_crc = zipfile.crc32(data)
            if new_crc == orig_crc:
              continue
            msg = f"""File appeared in multiple inputs with differing contents.
File: {dst_name}
Input1: {orig_filename}
Input2: {in_file}"""
            raise Exception(msg)

          if compress is not None:
            compress_entry = compress
          else:
            compress_entry = info.compress_type != zipfile.ZIP_STORED
          add_to_zip_hermetic(out_zip,
                              dst_name,
                              data=data,
                              compress=compress_entry)
          crc_by_name[dst_name] = (in_file, out_zip.getinfo(dst_name).CRC)
  finally:
    if output is not out_zip:
      out_zip.close()