From: Hiroshi Miura <miurahr@linux.com>
Date: Sun, 30 Oct 2022 14:27:46 +0900
Subject: Fix sanity check for path traversal attack

Forwarded: not-needed
Bug-Debian: https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=1032091
Origin: backport, https://github.com/miurahr/py7zr/commit/1bb43f17515c7f69673a1c88ab9cc72a7bbef406
Author: Hiroshi Miura <miurahr@linux.com>

- Previous versions do not detect the attack in some case
   - fixed it by call resolve()
   - resolve() converts "/hoge/fuga/../../../tmp/evil.sh" to be "/tmp/evil.sh" then
     relative_to() can detect path traversal attack.
- Add path checker in writef() and writestr() methods
  - When pass arcname as evil path such as "../../../../tmp/evil.sh"
    it raises ValueError
- Add test case of bad path detection
- extraction: check symlink and junction is under target folder
- Fix relative_path_marker removal
- Don't put windows file namespace to output file path

Signed-off-by: Hiroshi Miura <miurahr@linux.com>
---
 MANIFEST.in           |   1 -
 py7zr/helpers.py      | 134 +++++++++++++++++++++++++++++++++++++++++++++-
 py7zr/py7zr.py        | 145 +++++++++++++++++++++++++++-----------------------
 py7zr/win32compat.py  |  13 +++++
 tests/test_extract.py |   2 +-
 tests/test_zipslip.py |  54 +++++++++++++++++++
 6 files changed, 279 insertions(+), 70 deletions(-)
 create mode 100644 tests/test_zipslip.py

diff --git a/MANIFEST.in b/MANIFEST.in
index 95d2263..63b919e 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -3,7 +3,6 @@ include *.svg
 include *.toml
 include *.yml
 include LICENSE
-include .coveragerc
 include tox.ini
 recursive-include py7zr *.py
 recursive-include tests *.py
diff --git a/py7zr/helpers.py b/py7zr/helpers.py
index 8da7218..89ec89c 100644
--- a/py7zr/helpers.py
+++ b/py7zr/helpers.py
@@ -37,6 +37,11 @@ import _hashlib  # type: ignore  # noqa
 
 import py7zr.win32compat
 from py7zr.properties import READ_BLOCKSIZE
+from py7zr import Bad7zFile
+from py7zr.win32compat import is_windows_native_python, is_windows_unc_path
+
+# String used at the beginning of relative paths
+RELATIVE_PATH_MARKER = "./"
 
 
 def calculate_crc32(data: bytes, value: int = 0, blocksize: int = 1024 * 1024) -> int:
@@ -249,7 +254,7 @@ class ArchiveTimestamp(int):
 def islink(path):
     """
     Cross-platform islink implementation.
-    Supports Windows NT symbolic links and reparse points.
+    Support Windows NT symbolic links and reparse points.
     """
     is_symlink = os.path.islink(str(path))
     if sys.version_info >= (3, 8) or sys.platform != "win32" or sys.getwindowsversion()[0] < 6:
@@ -264,7 +269,7 @@ def islink(path):
 def readlink(path: Union[str, pathlib.Path], *, dir_fd=None) -> Union[str, pathlib.Path]:
     """
     Cross-platform compat implementation of os.readlink and Path.readlink().
-    Supports Windows NT symbolic links and reparse points.
+    Support Windows NT symbolic links and reparse points.
     When called with path argument as pathlike(str), return result as a pathlike(str).
     When called with Path object, return also Path object.
     When called with path argument as bytes, return result as a bytes.
@@ -447,3 +452,128 @@ class BufferedRW(io.BufferedIOBase):
 
     def __len__(self):
         return len(self._buf)
+
+
+def remove_relative_path_marker(path: str) -> str:
+    """
+    Removes './' from the beginning of a path-like string
+    """
+    processed_path = path
+
+    if path.startswith(RELATIVE_PATH_MARKER):
+        processed_path = path[len(RELATIVE_PATH_MARKER) :]
+
+    return processed_path
+
+
+def canonical_path(target: pathlib.PurePath) -> pathlib.PurePath:
+    """Return a canonical path of target argument."""
+    stack: List[str] = []
+    for p in target.parts:
+        if p != ".." or len(stack) == 0:
+            stack.append(p)
+            continue
+        # treat '..'
+        if stack[-1] == "..":
+            stack.append(p)  # '../' + '../' -> '../../'
+        elif stack[-1] == "/":
+            pass  # '/' + '../' -> '/'
+        else:
+            stack.pop()  # 'foo/boo/' + '..' -> 'foo/'
+    return pathlib.PurePath(*stack)
+
+
+def is_relative_to(my: pathlib.PurePath, *other) -> bool:
+    """Return True when path is relative to other path, otherwise False."""
+    try:
+        my.relative_to(*other)
+    except ValueError:
+        return False
+    return True
+
+
+def get_sanitized_output_path(fname: str, path: Optional[pathlib.Path]) -> pathlib.Path:
+    """
+    check f.filename has invalid directory traversals
+    When condition is not satisfied, raise Bad7zFile
+    """
+    if path is None:
+        target_path = canonical_path(pathlib.Path.cwd().joinpath(fname))
+        if is_relative_to(target_path, pathlib.Path.cwd()):
+            return pathlib.Path(remove_relative_path_marker(fname))
+    else:
+        outfile = canonical_path(path.joinpath(remove_relative_path_marker(fname)))
+        if is_relative_to(outfile, path):
+            return pathlib.Path(outfile)
+    raise Bad7zFile(f"Specified path is bad: {fname}")
+
+
+def check_archive_path(arcname: str) -> bool:
+    """
+    Check arcname argument is valid for archive.
+    It should not be absolute, if so it returns False.
+    It should not be evil traversal attack path.
+    Otherwise, returns True.
+    """
+    if pathlib.PurePath(arcname).is_absolute():
+        return False
+    # test against dummy parent path
+    if sys.platform == "win32":
+        path = pathlib.Path("C:/foo/boo/fuga/hoge/a90sufoiasj09/dafj08sajfa/")
+    else:
+        path = pathlib.Path("/foo/boo/fuga/hoge/a90sufoiasj09/dafj08sajfa/")
+    return is_path_valid(path.joinpath(arcname), path)
+
+
+def get_sanitized_output_path(fname: str, path: Optional[pathlib.Path]) -> pathlib.Path:
+    """
+    check f.filename has invalid directory traversals
+    do following but is_relative_to introduced in py 3.9,
+    so I replaced it with relative_to. when condition is not satisfied, raise ValueError
+    if not pathlib.Path(...).joinpath(remove_relative_path_marker(outname)).is_relative_to(...):
+        raise Bad7zFile
+    """
+    if path is None:
+        try:
+            pathlib.Path(os.getcwd()).joinpath(fname).resolve().relative_to(os.getcwd())
+            outfile = pathlib.Path(remove_relative_path_marker(fname))
+        except ValueError:
+            raise Bad7zFile(f"Specified path is bad: {fname}")
+    else:
+        try:
+            outfile = path.joinpath(remove_relative_path_marker(fname))
+            outfile.resolve().relative_to(path)
+        except ValueError:
+            raise Bad7zFile(f"Specified path is bad: {fname}")
+    return outfile
+
+
+def check_archive_path(arcname: str) -> bool:
+    path = pathlib.Path("/foo/boo/fuga/hoge/a90sufoiasj09/dafj08sajfa/")  # dummy path
+    return is_target_path_valid(path, path.joinpath(arcname))
+
+
+def is_target_path_valid(path: pathlib.Path, target: pathlib.Path) -> bool:
+    try:
+        if path.is_absolute():
+            target.resolve().relative_to(path)
+        else:
+            target.resolve().relative_to(pathlib.Path(os.getcwd()).joinpath(path))
+    except ValueError:
+        return False
+    return True
+
+
+def check_win32_file_namespace(pathname: pathlib.Path) -> pathlib.Path:
+    # When python on Windows and not python on Cygwin,
+    # Add win32 file namespace to exceed Microsoft Windows
+    # path length limitation to 260 bytes
+    # ref.
+    # https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file
+    # In editions of Windows before Windows 10 version 1607,
+    # the maximum length for a path is MAX_PATH, which is defined as
+    # 260 characters. In later versions of Windows, changing a registry key
+    # or select option when python installation is required to remove the limit.
+    if is_windows_native_python() and pathname.is_absolute() and not is_windows_unc_path(pathname):
+        pathname = pathlib.WindowsPath("\\\\?\\" + str(pathname))
+    return pathname
diff --git a/py7zr/py7zr.py b/py7zr/py7zr.py
index 706f1f8..fbc899e 100644
--- a/py7zr/py7zr.py
+++ b/py7zr/py7zr.py
@@ -41,7 +41,7 @@ from py7zr.archiveinfo import Folder, Header, SignatureHeader
 from py7zr.callbacks import ExtractCallback
 from py7zr.compressor import SupportedMethods, get_methods_names_string
 from py7zr.exceptions import Bad7zFile, CrcError, DecompressionError, InternalError, UnsupportedCompressionMethodError
-from py7zr.helpers import ArchiveTimestamp, MemIO, NullIO, calculate_crc32, filetime_to_dt, readlink
+from py7zr.helpers import ArchiveTimestamp, MemIO, NullIO, calculate_crc32, check_archive_path, filetime_to_dt, get_sanitized_output_path, is_target_path_valid, readlink
 from py7zr.properties import ARCHIVE_DEFAULT, ENCRYPTED_ARCHIVE_DEFAULT, MAGIC_7Z, READ_BLOCKSIZE
 
 if sys.platform.startswith('win'):
@@ -483,14 +483,10 @@ class SevenZipFile(contextlib.AbstractContextManager):
                     if outname not in fnames:
                         break
             fnames.append(outname)
-            if path is not None:
-                outfilename = path.joinpath(outname)
+            if path is None or path.is_absolute():
+                outfilename = get_sanitized_output_path(outname, path)
             else:
-                outfilename = pathlib.Path(outname)
-            if os.name == 'nt':
-                if outfilename.is_absolute():
-                    # hack for microsoft windows path length limit < 255
-                    outfilename = pathlib.WindowsPath('\\\\?\\' + str(outfilename))
+                outfilename = get_sanitized_output_path(outname, pathlib.Path(os.getcwd()).joinpath(path))
             if targets is not None and f.filename not in targets:
                 self.worker.register_filelike(f.id, None)
                 continue
@@ -535,52 +531,35 @@ class SevenZipFile(contextlib.AbstractContextManager):
 
         try:
             if callback is not None:
-                self.worker.extract(self.fp, parallel=(not self.password_protected and not self._filePassed), q=self.q)
+                self.worker.extract(self.fp, path, parallel=(not self.password_protected and not self._filePassed), q=self.q)
             else:
-                self.worker.extract(self.fp, parallel=(not self.password_protected and not self._filePassed))
+                self.worker.extract(self.fp, path, parallel=(not self.password_protected and not self._filePassed))
         except CrcError as ce:
             raise Bad7zFile("CRC32 error on archived file {}.".format(str(ce)))
 
         self.q.put(('post', None, None))
         if return_dict:
             return self._dict
-        else:
-            # create symbolic links on target path as a working directory.
-            # if path is None, work on current working directory.
-            for t in target_sym:
-                sym_dst = t.resolve()
-                with sym_dst.open('rb') as b:
-                    sym_src = b.read().decode(encoding='utf-8')  # symlink target name stored in utf-8
-                sym_dst.unlink()  # unlink after close().
-                sym_dst.symlink_to(pathlib.Path(sym_src))
-            # create junction point only on windows platform
-            if sys.platform.startswith('win'):
-                for t in target_junction:
-                    junction_dst = t.resolve()
-                    with junction_dst.open('rb') as b:
-                        junction_target = pathlib.Path(b.read().decode(encoding='utf-8'))
-                        junction_dst.unlink()
-                        _winapi.CreateJunction(junction_target, str(junction_dst))  # type: ignore  # noqa
-            # set file properties
-            for outfilename, properties in target_files:
-                # mtime
-                lastmodified = None
-                try:
-                    lastmodified = ArchiveTimestamp(properties['lastwritetime']).totimestamp()
-                except KeyError:
-                    pass
-                if lastmodified is not None:
-                    os.utime(str(outfilename), times=(lastmodified, lastmodified))
-                if os.name == 'posix':
-                    st_mode = properties['posix_mode']
-                    if st_mode is not None:
-                        outfilename.chmod(st_mode)
-                        continue
-                # fallback: only set readonly if specified
-                if properties['readonly'] and not properties['is_directory']:
-                    ro_mask = 0o777 ^ (stat.S_IWRITE | stat.S_IWGRP | stat.S_IWOTH)
-                    outfilename.chmod(outfilename.stat().st_mode & ro_mask)
-            return None
+        # set file properties
+        for outfilename, properties in target_files:
+            # mtime
+            lastmodified = None
+            try:
+                lastmodified = ArchiveTimestamp(properties['lastwritetime']).totimestamp()
+            except KeyError:
+                pass
+            if lastmodified is not None:
+                os.utime(str(outfilename), times=(lastmodified, lastmodified))
+            if os.name == 'posix':
+                st_mode = properties['posix_mode']
+                if st_mode is not None:
+                    outfilename.chmod(st_mode)
+                    continue
+            # fallback: only set readonly if specified
+            if properties['readonly'] and not properties['is_directory']:
+                ro_mask = 0o777 ^ (stat.S_IWRITE | stat.S_IWGRP | stat.S_IWOTH)
+                outfilename.chmod(outfilename.stat().st_mode & ro_mask)
+        return None
 
     def _prepare_append(self, filters, password):
         if password is not None and filters is None:
@@ -940,7 +919,7 @@ class SevenZipFile(contextlib.AbstractContextManager):
         for f in self.files:
             self.worker.register_filelike(f.id, None)
         try:
-            self.worker.extract(self.fp, parallel=(not self.password_protected), skip_notarget=False)  # TODO: print progress
+            self.worker.extract(self.fp, None, parallel=(not self.password_protected), skip_notarget=False)  # TODO: print progress
         except CrcError as crce:
             return str(crce)
         else:
@@ -998,28 +977,28 @@ class Worker:
         self.current_file_index = len(self.files)
         self.last_file_index = len(self.files)
 
-    def extract(self, fp: BinaryIO, parallel: bool, skip_notarget=True, q=None) -> None:
+    def extract(self, fp: BinaryIO, path: Optional[pathlib.Path], parallel: bool, skip_notarget=True, q=None) -> None:
         """Extract worker method to handle 7zip folder and decompress each files."""
         if hasattr(self.header, 'main_streams') and self.header.main_streams is not None:
             src_end = self.src_start + self.header.main_streams.packinfo.packpositions[-1]
             numfolders = self.header.main_streams.unpackinfo.numfolders
             if numfolders == 1:
-                self.extract_single(fp, self.files, self.src_start, src_end, q, skip_notarget=skip_notarget)
+                self.extract_single(fp, self.files, path, self.src_start, src_end, q, skip_notarget=skip_notarget)
             else:
                 folders = self.header.main_streams.unpackinfo.folders
                 positions = self.header.main_streams.packinfo.packpositions
                 empty_files = [f for f in self.files if f.emptystream]
                 if not parallel:
-                    self.extract_single(fp, empty_files, 0, 0, q)
+                    self.extract_single(fp, empty_files, path, 0, 0, q)
                     for i in range(numfolders):
                         if skip_notarget:
                             if not any([self.target_filepath.get(f.id, None) for f in folders[i].files]):
                                 continue
-                        self.extract_single(fp, folders[i].files, self.src_start + positions[i],
+                        self.extract_single(fp, folders[i].files, path, self.src_start + positions[i],
                                             self.src_start + positions[i + 1], q, skip_notarget=skip_notarget)
                 else:
                     filename = getattr(fp, 'name', None)
-                    self.extract_single(open(filename, 'rb'), empty_files, 0, 0, q)
+                    self.extract_single(open(filename, 'rb'), empty_files, path, 0, 0, q)
                     extract_threads = []
                     exc_q = queue.Queue()  # type: queue.Queue
                     for i in range(numfolders):
@@ -1028,6 +1007,7 @@ class Worker:
                                 continue
                         p = threading.Thread(target=self.extract_single,
                                              args=(filename, folders[i].files,
+                                                   path,
                                                    self.src_start + positions[i], self.src_start + positions[i + 1],
                                                    q, exc_q, skip_notarget))
                         p.start()
@@ -1041,9 +1021,9 @@ class Worker:
                         raise exc_type(exc_val).with_traceback(exc_tb)
         else:
             empty_files = [f for f in self.files if f.emptystream]
-            self.extract_single(fp, empty_files, 0, 0, q)
+            self.extract_single(fp, empty_files, path, 0, 0, q)
 
-    def extract_single(self, fp: Union[BinaryIO, str], files, src_start: int, src_end: int,
+    def extract_single(self, fp: Union[BinaryIO, str], files, path, src_start: int, src_end: int,
                        q: Optional[queue.Queue], exc_q: Optional[queue.Queue] = None, skip_notarget=True) -> None:
         """Single thread extractor that takes file lists in single 7zip folder."""
         if files is None:
@@ -1057,22 +1037,55 @@ class Worker:
                 if q is not None:
                     q.put(('s', str(f.filename), str(f.compressed) if f.compressed is not None else '0'))
                 fileish = self.target_filepath.get(f.id, None)
-                if fileish is not None:
+                if fileish is None:
+                    if not f.emptystream:
+                        just_check.append(f)
+                else:
                     # delayed execution of crc check.
                     self._check(fp, just_check, src_end)
                     just_check = []
                     fileish.parent.mkdir(parents=True, exist_ok=True)
-                    with fileish.open(mode='wb') as ofp:
-                        if not f.emptystream:
-                            # extract to file
-                            crc32 = self.decompress(fp, f.folder, ofp, f.uncompressed, f.compressed, src_end)
-                            ofp.seek(0)
-                            if f.crc32 is not None and crc32 != f.crc32:
-                                raise CrcError("{}".format(f.filename))
+                    if not f.emptystream:
+                        if f.is_junction and not isinstance(fileish, MemIO) and sys.platform == "win32":
+                            with io.BytesIO() as ofp:
+                                self.decompress(fp, f.folder, ofp, f.uncompressed, f.compressed, src_end)
+                                dst: str = ofp.read().decode("utf-8")
+                                if is_target_path_valid(path, fileish.parent.joinpath(dst)):
+                                    # fileish.unlink(missing_ok=True) > py3.7
+                                    if fileish.exists():
+                                        fileish.unlink()
+                                        if sys.platform == "win32":  # hint for mypy
+                                            _winapi.CreateJunction(str(fileish), dst)  # noqa
+                                else:
+                                    raise Bad7zFile("Junction point out of target directory.")
+                        elif f.is_symlink and not isinstance(fileish, MemIO):
+                            with io.BytesIO() as omfp:
+                                self.decompress(fp, f.folder, omfp, f.uncompressed, f.compressed, src_end)
+                                omfp.seek(0)
+                                dst = omfp.read().decode("utf-8")
+                                # check sym_target points inside an archive target?
+                                if is_target_path_valid(path, fileish.parent.joinpath(dst)):
+                                    sym_target = pathlib.Path(dst)
+                                    # fileish.unlink(missing_ok=True) > py3.7
+                                    if fileish.exists():
+                                        fileish.unlink()
+                                    fileish.symlink_to(sym_target)
+                                else:
+                                    raise Bad7zFile("Symlink point out of target directory.")
+                        else:
+                            with fileish.open(mode='wb') as obfp:
+                                # extract to file
+                                crc32 = self.decompress(fp, f.folder, obfp, f.uncompressed, f.compressed, src_end)
+                                obfp.seek(0)
+                                if f.crc32 is not None and crc32 != f.crc32:
+                                    raise CrcError("{}".format(f.filename))
+                    else:
+                        # just create empty file
+                        if not isinstance(fileish, MemIO):
+                            fileish.touch()
                         else:
-                            pass  # just create empty file
-                elif not f.emptystream:
-                    just_check.append(f)
+                            with fileish.open() as ofp:
+                                pass
                 if q is not None:
                     q.put(('e', str(f.filename), str(f.uncompressed)))
             if not skip_notarget:
diff --git a/py7zr/win32compat.py b/py7zr/win32compat.py
index b2c3635..0771a65 100644
--- a/py7zr/win32compat.py
+++ b/py7zr/win32compat.py
@@ -169,3 +169,16 @@ if sys.platform == "win32":
             return pathlib.WindowsPath(rpath)
         else:
             return rpath
+
+
+def is_windows_native_python() -> bool:
+    return (
+        sys.platform == "win32"
+        and os.name == "nt"
+        and "cygwin" not in platform.system().lower()
+        and "cygwin" not in sys.platform
+    )
+
+
+def is_windows_unc_path(path) -> bool:
+    return sys.platform == "win32" and path.drive.startswith("\\\\")
diff --git a/tests/test_extract.py b/tests/test_extract.py
index ae7f2fd..633b76f 100644
--- a/tests/test_extract.py
+++ b/tests/test_extract.py
@@ -253,7 +253,7 @@ def test_skip():
     for i, cf in enumerate(archive.files):
         assert cf is not None
         archive.worker.register_filelike(cf.id, None)
-    archive.worker.extract(archive.fp, parallel=True)
+    archive.worker.extract(archive.fp, None, parallel=True)
     archive.close()
 
 
diff --git a/tests/test_zipslip.py b/tests/test_zipslip.py
new file mode 100644
index 0000000..27a1657
--- /dev/null
+++ b/tests/test_zipslip.py
@@ -0,0 +1,54 @@
+import os
+
+import pytest
+
+from py7zr import SevenZipFile
+from py7zr.exceptions import Bad7zFile
+from py7zr.helpers import check_archive_path, get_sanitized_output_path
+from py7zr.properties import FILTER_LZMA2, PRESET_DEFAULT
+
+testdata_path = os.path.join(os.path.dirname(__file__), "data")
+
+
+@pytest.mark.misc
+def test_check_archive_path():
+    bad_path = "../../.../../../../../../tmp/evil.sh"
+    assert not check_archive_path(bad_path)
+
+
+@pytest.mark.misc
+def test_get_sanitized_output_path_1(tmp_path):
+    bad_path = "../../.../../../../../../tmp/evil.sh"
+    with pytest.raises(Bad7zFile):
+        get_sanitized_output_path(bad_path, tmp_path)
+
+
+@pytest.mark.misc
+def test_get_sanitized_output_path_2(tmp_path):
+    good_path = "good.sh"
+    expected = tmp_path.joinpath(good_path)
+    assert expected == get_sanitized_output_path(good_path, tmp_path)
+
+
+@pytest.mark.misc
+def test_extract_path_traversal_attack(tmp_path):
+    my_filters = [
+        {"id": FILTER_LZMA2, "preset": PRESET_DEFAULT},
+    ]
+    target = tmp_path.joinpath("target.7z")
+
+    from base64 import b64decode
+    data = """
+    N3q8ryccAATOVjH2nAAAAAAAAAAVAAAAAAAAABe26oXgACYAIV0AEYhCRj30FjRzCg2kNp3mcg12
+    I+GegsuiKVIVM4p1+AQVAOAAqgBrXQAAgTMHrg/QluR8nz9HQQQEMnr/nRksOfngonWcaGZa9yk2
+    MshKgENm9F8IrHzRtlxA1rG7ojA2dU0VubucO3DWfQiBaqCqtidfv5DCj8LxFZ7PbZYke+rw7nKV
+    bAoyU7Z/s4bc+kz6VLwXgAAXBikBCXMABwsBAAEhIQEYDICrAAA=
+    """
+    bindata = b64decode(data)
+
+    with open(target, "wb") as archive:
+        archive.write(bindata)
+
+    with pytest.raises(Bad7zFile):
+        with SevenZipFile(target, "r") as archive:
+            archive.extractall(path=tmp_path)
