1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
|
# coding: utf-8
from __future__ import division, unicode_literals
import io
import zipfile
import contextlib
import tempfile
import shutil
import string
try:
import pathlib
except ImportError:
import pathlib2 as pathlib
if not hasattr(contextlib, 'ExitStack'):
import contextlib2
contextlib.ExitStack = contextlib2.ExitStack
try:
import unittest
unittest.TestCase.subTest
except AttributeError:
import unittest2 as unittest
import jaraco.itertools
import func_timeout
import zipp
__metaclass__ = type
consume = tuple
def add_dirs(zf):
"""
Given a writable zip file zf, inject directory entries for
any directories implied by the presence of children.
"""
for name in zipp.CompleteDirs._implied_dirs(zf.namelist()):
zf.writestr(name, b"")
return zf
def build_alpharep_fixture():
"""
Create a zip file with this structure:
.
├── a.txt
├── b
│ ├── c.txt
│ ├── d
│ │ └── e.txt
│ └── f.txt
└── g
└── h
└── i.txt
This fixture has the following key characteristics:
- a file at the root (a)
- a file two levels deep (b/d/e)
- multiple files in a directory (b/c, b/f)
- a directory containing only a directory (g/h)
"alpha" because it uses alphabet
"rep" because it's a representative example
"""
data = io.BytesIO()
zf = zipfile.ZipFile(data, "w")
zf.writestr("a.txt", b"content of a")
zf.writestr("b/c.txt", b"content of c")
zf.writestr("b/d/e.txt", b"content of e")
zf.writestr("b/f.txt", b"content of f")
zf.writestr("g/h/i.txt", b"content of i")
zf.filename = "alpharep.zip"
return zf
@contextlib.contextmanager
def temp_dir():
tmpdir = tempfile.mkdtemp()
try:
yield pathlib.Path(tmpdir)
finally:
shutil.rmtree(tmpdir)
class TestPath(unittest.TestCase):
def setUp(self):
self.fixtures = contextlib.ExitStack()
self.addCleanup(self.fixtures.close)
def zipfile_alpharep(self):
with self.subTest():
yield build_alpharep_fixture()
with self.subTest():
yield add_dirs(build_alpharep_fixture())
def zipfile_ondisk(self):
tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
for alpharep in self.zipfile_alpharep():
buffer = alpharep.fp
alpharep.close()
path = tmpdir / alpharep.filename
with path.open("wb") as strm:
strm.write(buffer.getvalue())
yield path
def test_iterdir_and_types(self):
for alpharep in self.zipfile_alpharep():
root = zipp.Path(alpharep)
assert root.is_dir()
a, b, g = root.iterdir()
assert a.is_file()
assert b.is_dir()
assert g.is_dir()
c, f, d = b.iterdir()
assert c.is_file() and f.is_file()
e, = d.iterdir()
assert e.is_file()
h, = g.iterdir()
i, = h.iterdir()
assert i.is_file()
def test_open(self):
for alpharep in self.zipfile_alpharep():
root = zipp.Path(alpharep)
a, b, g = root.iterdir()
with a.open() as strm:
data = strm.read()
assert data == "content of a"
def test_read(self):
for alpharep in self.zipfile_alpharep():
root = zipp.Path(alpharep)
a, b, g = root.iterdir()
assert a.read_text() == "content of a"
assert a.read_bytes() == b"content of a"
def test_joinpath(self):
for alpharep in self.zipfile_alpharep():
root = zipp.Path(alpharep)
a = root.joinpath("a")
assert a.is_file()
e = root.joinpath("b").joinpath("d").joinpath("e.txt")
assert e.read_text() == "content of e"
def test_traverse_truediv(self):
for alpharep in self.zipfile_alpharep():
root = zipp.Path(alpharep)
a = root / "a"
assert a.is_file()
e = root / "b" / "d" / "e.txt"
assert e.read_text() == "content of e"
def test_traverse_simplediv(self):
"""
Disable the __future__.division when testing traversal.
"""
for alpharep in self.zipfile_alpharep():
code = compile(
source="zipp.Path(alpharep) / 'a'",
filename="(test)",
mode="eval",
dont_inherit=True,
)
eval(code)
def test_pathlike_construction(self):
"""
zipp.Path should be constructable from a path-like object
"""
for zipfile_ondisk in self.zipfile_ondisk():
pathlike = pathlib.Path(str(zipfile_ondisk))
zipp.Path(pathlike)
def test_traverse_pathlike(self):
for alpharep in self.zipfile_alpharep():
root = zipp.Path(alpharep)
root / pathlib.Path("a")
def test_parent(self):
for alpharep in self.zipfile_alpharep():
root = zipp.Path(alpharep)
assert (root / 'a').parent.at == ''
assert (root / 'a' / 'b').parent.at == 'a/'
def test_dir_parent(self):
for alpharep in self.zipfile_alpharep():
root = zipp.Path(alpharep)
assert (root / 'b').parent.at == ''
assert (root / 'b/').parent.at == ''
def test_missing_dir_parent(self):
for alpharep in self.zipfile_alpharep():
root = zipp.Path(alpharep)
assert (root / 'missing dir/').parent.at == ''
def test_mutability(self):
"""
If the underlying zipfile is changed, the Path object should
reflect that change.
"""
for alpharep in self.zipfile_alpharep():
root = zipp.Path(alpharep)
a, b, g = root.iterdir()
alpharep.writestr('foo.txt', b'foo')
alpharep.writestr('bar/baz.txt', b'baz')
assert any(
child.name == 'foo.txt'
for child in root.iterdir())
assert (root / 'foo.txt').read_text() == 'foo'
baz, = (root / 'bar').iterdir()
assert baz.read_text() == 'baz'
HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13
def huge_zipfile(self):
"""Create a read-only zipfile with a huge number of entries entries."""
strm = io.BytesIO()
zf = zipfile.ZipFile(strm, "w")
for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
zf.writestr(entry, entry)
zf.mode = 'r'
return zf
def test_joinpath_constant_time(self):
"""
Ensure joinpath on items in zipfile is linear time.
"""
root = zipp.Path(self.huge_zipfile())
entries = jaraco.itertools.Counter(root.iterdir())
for entry in entries:
entry.joinpath('suffix')
# Check the file iterated all items
assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
@func_timeout.func_set_timeout(3)
def test_implied_dirs_performance(self):
data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
zipp.CompleteDirs._implied_dirs(data)
|