"""
Low-level OS functionality wrappers used by pathlib.
"""

from errno import *
from io import TextIOWrapper
import os
import sys
try:
    from io import text_encoding
except ImportError:
    def text_encoding(encoding):
        return encoding
try:
    import fcntl
except ImportError:
    fcntl = None
try:
    import posix
except ImportError:
    posix = None
try:
    import _winapi
except ImportError:
    _winapi = None


def _get_copy_blocksize(infd):
    """Determine blocksize for fastcopying on Linux.
    Hopefully the whole file will be copied in a single call.
    The copying itself should be performed in a loop 'till EOF is
    reached (0 return) so a blocksize smaller or bigger than the actual
    file size should not make any difference, also in case the file
    content changes while being copied.
    """
    try:
        blocksize = max(os.fstat(infd).st_size, 2 ** 23)  # min 8 MiB
    except OSError:
        blocksize = 2 ** 27  # 128 MiB
    # On 32-bit architectures truncate to 1 GiB to avoid OverflowError,
    # see gh-82500.
    if sys.maxsize < 2 ** 32:
        blocksize = min(blocksize, 2 ** 30)
    return blocksize


if fcntl and hasattr(fcntl, 'FICLONE'):
    def _ficlone(source_fd, target_fd):
        """
        Perform a lightweight copy of two files, where the data blocks are
        copied only when modified. This is known as Copy on Write (CoW),
        instantaneous copy or reflink.
        """
        fcntl.ioctl(target_fd, fcntl.FICLONE, source_fd)
else:
    _ficlone = None


if posix and hasattr(posix, '_fcopyfile'):
    def _fcopyfile(source_fd, target_fd):
        """
        Copy a regular file content using high-performance fcopyfile(3)
        syscall (macOS).
        """
        posix._fcopyfile(source_fd, target_fd, posix._COPYFILE_DATA)
else:
    _fcopyfile = None


if hasattr(os, 'copy_file_range'):
    def _copy_file_range(source_fd, target_fd):
        """
        Copy data from one regular mmap-like fd to another by using a
        high-performance copy_file_range(2) syscall that gives filesystems
        an opportunity to implement the use of reflinks or server-side
        copy.
        This should work on Linux >= 4.5 only.
        """
        blocksize = _get_copy_blocksize(source_fd)
        offset = 0
        while True:
            sent = os.copy_file_range(source_fd, target_fd, blocksize,
                                      offset_dst=offset)
            if sent == 0:
                break  # EOF
            offset += sent
else:
    _copy_file_range = None


if hasattr(os, 'sendfile'):
    def _sendfile(source_fd, target_fd):
        """Copy data from one regular mmap-like fd to another by using
        high-performance sendfile(2) syscall.
        This should work on Linux >= 2.6.33 only.
        """
        blocksize = _get_copy_blocksize(source_fd)
        offset = 0
        while True:
            sent = os.sendfile(target_fd, source_fd, offset, blocksize)
            if sent == 0:
                break  # EOF
            offset += sent
else:
    _sendfile = None


if _winapi and hasattr(_winapi, 'CopyFile2'):
    def copyfile2(source, target):
        """
        Copy from one file to another using CopyFile2 (Windows only).
        """
        _winapi.CopyFile2(source, target, 0)
else:
    copyfile2 = None


def copyfileobj(source_f, target_f):
    """
    Copy data from file-like object source_f to file-like object target_f.
    """
    try:
        source_fd = source_f.fileno()
        target_fd = target_f.fileno()
    except Exception:
        pass  # Fall through to generic code.
    else:
        try:
            # Use OS copy-on-write where available.
            if _ficlone:
                try:
                    _ficlone(source_fd, target_fd)
                    return
                except OSError as err:
                    if err.errno not in (EBADF, EOPNOTSUPP, ETXTBSY, EXDEV):
                        raise err

            # Use OS copy where available.
            if _fcopyfile:
                try:
                    _fcopyfile(source_fd, target_fd)
                    return
                except OSError as err:
                    if err.errno not in (EINVAL, ENOTSUP):
                        raise err
            if _copy_file_range:
                try:
                    _copy_file_range(source_fd, target_fd)
                    return
                except OSError as err:
                    if err.errno not in (ETXTBSY, EXDEV):
                        raise err
            if _sendfile:
                try:
                    _sendfile(source_fd, target_fd)
                    return
                except OSError as err:
                    if err.errno != ENOTSOCK:
                        raise err
        except OSError as err:
            # Produce more useful error messages.
            err.filename = source_f.name
            err.filename2 = target_f.name
            raise err

    # Last resort: copy with fileobj read() and write().
    read_source = source_f.read
    write_target = target_f.write
    while buf := read_source(1024 * 1024):
        write_target(buf)


def _open_reader(obj):
    cls = type(obj)
    try:
        open_reader = cls.__open_reader__
    except AttributeError:
        cls_name = cls.__name__
        raise TypeError(f"{cls_name} can't be opened for reading") from None
    else:
        return open_reader(obj)


def _open_writer(obj, mode):
    cls = type(obj)
    try:
        open_writer = cls.__open_writer__
    except AttributeError:
        cls_name = cls.__name__
        raise TypeError(f"{cls_name} can't be opened for writing") from None
    else:
        return open_writer(obj, mode)


def _open_updater(obj, mode):
    cls = type(obj)
    try:
        open_updater = cls.__open_updater__
    except AttributeError:
        cls_name = cls.__name__
        raise TypeError(f"{cls_name} can't be opened for updating") from None
    else:
        return open_updater(obj, mode)


def vfsopen(obj, mode='r', buffering=-1, encoding=None, errors=None,
            newline=None):
    """
    Open the file pointed to by this path and return a file object, as
    the built-in open() function does.

    Unlike the built-in open() function, this function additionally accepts
    'openable' objects, which are objects with any of these special methods:

        __open_reader__()
        __open_writer__(mode)
        __open_updater__(mode)

    '__open_reader__' is called for 'r' mode; '__open_writer__' for 'a', 'w'
    and 'x' modes; and '__open_updater__' for 'r+' and 'w+' modes. If text
    mode is requested, the result is wrapped in an io.TextIOWrapper object.
    """
    if buffering != -1:
        raise ValueError("buffer size can't be customized")
    text = 'b' not in mode
    if text:
        # Call io.text_encoding() here to ensure any warning is raised at an
        # appropriate stack level.
        encoding = text_encoding(encoding)
    try:
        return open(obj, mode, buffering, encoding, errors, newline)
    except TypeError:
        pass
    if not text:
        if encoding is not None:
            raise ValueError("binary mode doesn't take an encoding argument")
        if errors is not None:
            raise ValueError("binary mode doesn't take an errors argument")
        if newline is not None:
            raise ValueError("binary mode doesn't take a newline argument")
    mode = ''.join(sorted(c for c in mode if c not in 'bt'))
    if mode == 'r':
        stream = _open_reader(obj)
    elif mode in ('a', 'w', 'x'):
        stream = _open_writer(obj, mode)
    elif mode in ('+r', '+w'):
        stream = _open_updater(obj, mode[1])
    else:
        raise ValueError(f'invalid mode: {mode}')
    if text:
        stream = TextIOWrapper(stream, encoding, errors, newline)
    return stream


def vfspath(obj):
    """
    Return the string representation of a virtual path object.
    """
    cls = type(obj)
    try:
        vfspath_method = cls.__vfspath__
    except AttributeError:
        cls_name = cls.__name__
        raise TypeError(f"expected JoinablePath object, not {cls_name}") from None
    else:
        return vfspath_method(obj)


def ensure_distinct_paths(source, target):
    """
    Raise OSError(EINVAL) if the other path is within this path.
    """
    # Note: there is no straightforward, foolproof algorithm to determine
    # if one directory is within another (a particularly perverse example
    # would be a single network share mounted in one location via NFS, and
    # in another location via CIFS), so we simply checks whether the
    # other path is lexically equal to, or within, this path.
    if source == target:
        err = OSError(EINVAL, "Source and target are the same path")
    elif source in target.parents:
        err = OSError(EINVAL, "Source path is a parent of target path")
    else:
        return
    err.filename = vfspath(source)
    err.filename2 = vfspath(target)
    raise err


def ensure_different_files(source, target):
    """
    Raise OSError(EINVAL) if both paths refer to the same file.
    """
    try:
        source_file_id = source.info._file_id
        target_file_id = target.info._file_id
    except AttributeError:
        if source != target:
            return
    else:
        try:
            if source_file_id() != target_file_id():
                return
        except (OSError, ValueError):
            return
    err = OSError(EINVAL, "Source and target are the same file")
    err.filename = vfspath(source)
    err.filename2 = vfspath(target)
    raise err
