1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
|
# -*- encoding: UTF-8
from __future__ import unicode_literals
import sys
import os
import six
import tempfile
import unittest
import zipfile
from fs import zipfs
from fs.compress import write_zip
from fs.enums import Seek
from fs.errors import NoURL
from fs.opener import open_fs
from fs.opener.errors import NotWriteable
from fs.test import FSTestCases
from .test_archives import ArchiveTestCases
class TestWriteReadZipFS(unittest.TestCase):
def setUp(self):
fh, self._temp_path = tempfile.mkstemp()
os.close(fh)
def tearDown(self):
os.remove(self._temp_path)
def test_unicode_paths(self):
# https://github.com/PyFilesystem/pyfilesystem2/issues/135
with zipfs.ZipFS(self._temp_path, write=True) as zip_fs:
zip_fs.writetext("Файл", "some content")
with zipfs.ZipFS(self._temp_path) as zip_fs:
paths = list(zip_fs.walk.files())
for path in paths:
self.assertIsInstance(path, six.text_type)
with zip_fs.openbin(path) as f:
f.read()
class TestWriteZipFS(FSTestCases, unittest.TestCase):
"""
Test ZIPFS implementation.
When writing, a ZipFS is essentially a TempFS.
"""
def make_fs(self):
_zip_file = tempfile.TemporaryFile()
fs = zipfs.ZipFS(_zip_file, write=True)
fs._zip_file = _zip_file
return fs
def destroy_fs(self, fs):
fs.close()
del fs._zip_file
class TestReadZipFS(ArchiveTestCases, unittest.TestCase):
"""
Test Reading zip files.
"""
def compress(self, fs):
fh, self._temp_path = tempfile.mkstemp()
os.close(fh)
write_zip(fs, self._temp_path)
def load_archive(self):
return zipfs.ZipFS(self._temp_path)
def remove_archive(self):
os.remove(self._temp_path)
def test_large(self):
test_fs = open_fs("mem://")
test_fs.writebytes("test.bin", b"a" * 50000)
write_zip(test_fs, self._temp_path)
self.fs = self.load_archive()
with self.fs.openbin("test.bin") as f:
self.assertEqual(f.read(), b"a" * 50000)
with self.fs.openbin("test.bin") as f:
self.assertEqual(f.read(50000), b"a" * 50000)
with self.fs.openbin("test.bin") as f:
self.assertEqual(f.read1(), b"a" * 50000)
with self.fs.openbin("test.bin") as f:
self.assertEqual(f.read1(50000), b"a" * 50000)
def test_getinfo(self):
super(TestReadZipFS, self).test_getinfo()
top = self.fs.getinfo("top.txt", ["zip"])
if sys.platform in ("linux", "darwin"):
self.assertEqual(top.get("zip", "create_system"), 3)
def test_openbin(self):
with self.fs.openbin("top.txt") as f:
self.assertEqual(f.name, "top.txt")
with self.fs.openbin("top.txt") as f:
self.assertRaises(ValueError, f.seek, -2, Seek.set)
with self.fs.openbin("top.txt") as f:
self.assertRaises(ValueError, f.seek, 2, Seek.end)
with self.fs.openbin("top.txt") as f:
self.assertRaises(ValueError, f.seek, 0, 5)
def test_read(self):
with self.fs.openbin("top.txt") as f:
self.assertEqual(f.read(), b"Hello, World")
with self.fs.openbin("top.txt") as f:
self.assertEqual(f.read(5), b"Hello")
self.assertEqual(f.read(7), b", World")
with self.fs.openbin("top.txt") as f:
self.assertEqual(f.read(12), b"Hello, World")
def test_read1(self):
with self.fs.openbin("top.txt") as f:
self.assertEqual(f.read1(), b"Hello, World")
with self.fs.openbin("top.txt") as f:
self.assertEqual(f.read1(5), b"Hello")
self.assertEqual(f.read1(7), b", World")
with self.fs.openbin("top.txt") as f:
self.assertEqual(f.read1(12), b"Hello, World")
def test_seek_set(self):
with self.fs.openbin("top.txt") as f:
self.assertEqual(f.tell(), 0)
self.assertEqual(f.read(), b"Hello, World")
self.assertEqual(f.tell(), 12)
self.assertEqual(f.read(), b"")
self.assertEqual(f.tell(), 12)
self.assertEqual(f.seek(0), 0)
self.assertEqual(f.tell(), 0)
self.assertEqual(f.read1(), b"Hello, World")
self.assertEqual(f.tell(), 12)
self.assertEqual(f.seek(1), 1)
self.assertEqual(f.tell(), 1)
self.assertEqual(f.read(), b"ello, World")
self.assertEqual(f.tell(), 12)
self.assertEqual(f.seek(7), 7)
self.assertEqual(f.tell(), 7)
self.assertEqual(f.read(), b"World")
self.assertEqual(f.tell(), 12)
def test_seek_current(self):
with self.fs.openbin("top.txt") as f:
self.assertEqual(f.tell(), 0)
self.assertEqual(f.read(5), b"Hello")
self.assertEqual(f.tell(), 5)
self.assertEqual(f.seek(2, Seek.current), 7)
self.assertEqual(f.read1(), b"World")
self.assertEqual(f.tell(), 12)
self.assertEqual(f.seek(-1, Seek.current), 11)
self.assertEqual(f.read(), b"d")
with self.fs.openbin("top.txt") as f:
self.assertRaises(ValueError, f.seek, -1, Seek.current)
def test_seek_end(self):
with self.fs.openbin("top.txt") as f:
self.assertEqual(f.tell(), 0)
self.assertEqual(f.seek(-12, Seek.end), 0)
self.assertEqual(f.read1(5), b"Hello")
self.assertEqual(f.seek(-7, Seek.end), 5)
self.assertEqual(f.seek(-5, Seek.end), 7)
self.assertEqual(f.read(), b"World")
def test_geturl_for_fs(self):
test_file = "foo/bar/egg/foofoo"
expected = "zip://{zip_file_path}!/{file_inside_zip}".format(
zip_file_path=self._temp_path.replace("\\", "/"), file_inside_zip=test_file
)
self.assertEqual(self.fs.geturl(test_file, purpose="fs"), expected)
def test_geturl_for_fs_but_file_is_binaryio(self):
self.fs._file = six.BytesIO()
self.assertRaises(NoURL, self.fs.geturl, "test", "fs")
def test_geturl_for_download(self):
test_file = "foo/bar/egg/foofoo"
with self.assertRaises(NoURL):
self.fs.geturl(test_file)
def test_read_non_existent_file(self):
fs = zipfs.ZipFS(open(self._temp_path, "rb"))
# it has been very difficult to catch exception in __del__()
del fs._zip
try:
fs.close()
except AttributeError:
self.fail("Could not close tar fs properly")
except Exception:
self.fail("Strange exception in closing fs")
class TestReadZipFSMem(TestReadZipFS):
def make_source_fs(self):
return open_fs("mem://")
class TestDirsZipFS(unittest.TestCase):
def test_implied(self):
"""Test zipfs creates intermediate directories."""
fh, path = tempfile.mkstemp("testzip.zip")
try:
os.close(fh)
with zipfile.ZipFile(path, mode="w") as z:
z.writestr("foo/bar/baz/egg", b"hello")
with zipfs.ReadZipFS(path) as zip_fs:
foo = zip_fs.getinfo("foo", ["details"])
self.assertEqual(zip_fs.getinfo("foo/bar").name, "bar")
self.assertEqual(zip_fs.getinfo("foo/bar/baz").name, "baz")
self.assertTrue(foo.is_dir)
self.assertTrue(zip_fs.isfile("foo/bar/baz/egg"))
finally:
os.remove(path)
class TestOpener(unittest.TestCase):
def test_not_writeable(self):
with self.assertRaises(NotWriteable):
open_fs("zip://foo.zip", writeable=True)
|