# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt

"""Coverage data for coverage.py.

This file had the 4.x JSON data support, which is now gone.  This file still
has storage-agnostic helpers, and is kept to avoid changing too many imports.
CoverageData is now defined in sqldata.py, and imported here to keep the
imports working.

"""

from __future__ import annotations

import functools
import glob
import hashlib
import os.path

from typing import Callable, Iterable

from coverage.exceptions import CoverageException, NoDataError
from coverage.files import PathAliases
from coverage.misc import Hasher, file_be_gone, human_sorted, plural
from coverage.sqldata import CoverageData


def line_counts(data: CoverageData, fullpath: bool = False) -> dict[str, int]:
    """Return a dict summarizing the line coverage data.

    Keys are based on the file names, and values are the number of executed
    lines.  If `fullpath` is true, then the keys are the full pathnames of
    the files, otherwise they are the basenames of the files.

    Returns a dict mapping file names to counts of lines.

    """
    summ = {}
    filename_fn: Callable[[str], str]
    if fullpath:
        # pylint: disable=unnecessary-lambda-assignment
        filename_fn = lambda f: f
    else:
        filename_fn = os.path.basename
    for filename in data.measured_files():
        lines = data.lines(filename)
        assert lines is not None
        summ[filename_fn(filename)] = len(lines)
    return summ


def add_data_to_hash(data: CoverageData, filename: str, hasher: Hasher) -> None:
    """Contribute `filename`'s data to the `hasher`.

    `hasher` is a `coverage.misc.Hasher` instance to be updated with
    the file's data.  It should only get the results data, not the run
    data.

    """
    if data.has_arcs():
        hasher.update(sorted(data.arcs(filename) or []))
    else:
        hasher.update(sorted_lines(data, filename))
    hasher.update(data.file_tracer(filename))


def combinable_files(data_file: str, data_paths: Iterable[str] | None = None) -> list[str]:
    """Make a list of data files to be combined.

    `data_file` is a path to a data file.  `data_paths` is a list of files or
    directories of files.

    Returns a list of absolute file paths.
    """
    data_dir, local = os.path.split(os.path.abspath(data_file))

    data_paths = data_paths or [data_dir]
    files_to_combine = []
    for p in data_paths:
        if os.path.isfile(p):
            files_to_combine.append(os.path.abspath(p))
        elif os.path.isdir(p):
            pattern = glob.escape(os.path.join(os.path.abspath(p), local)) +".*"
            files_to_combine.extend(glob.glob(pattern))
        else:
            raise NoDataError(f"Couldn't combine from non-existent path '{p}'")

    # SQLite might have made journal files alongside our database files.
    # We never want to combine those.
    files_to_combine = [fnm for fnm in files_to_combine if not fnm.endswith("-journal")]

    # Sorting isn't usually needed, since it shouldn't matter what order files
    # are combined, but sorting makes tests more predictable, and makes
    # debugging more understandable when things go wrong.
    return sorted(files_to_combine)


def combine_parallel_data(
    data: CoverageData,
    aliases: PathAliases | None = None,
    data_paths: Iterable[str] | None = None,
    strict: bool = False,
    keep: bool = False,
    message: Callable[[str], None] | None = None,
) -> None:
    """Combine a number of data files together.

    `data` is a CoverageData.

    Treat `data.filename` as a file prefix, and combine the data from all
    of the data files starting with that prefix plus a dot.

    If `aliases` is provided, it's a `PathAliases` object that is used to
    re-map paths to match the local machine's.

    If `data_paths` is provided, it is a list of directories or files to
    combine.  Directories are searched for files that start with
    `data.filename` plus dot as a prefix, and those files are combined.

    If `data_paths` is not provided, then the directory portion of
    `data.filename` is used as the directory to search for data files.

    Unless `keep` is True every data file found and combined is then deleted
    from disk. If a file cannot be read, a warning will be issued, and the
    file will not be deleted.

    If `strict` is true, and no files are found to combine, an error is
    raised.

    `message` is a function to use for printing messages to the user.

    """
    files_to_combine = combinable_files(data.base_filename(), data_paths)

    if strict and not files_to_combine:
        raise NoDataError("No data to combine")

    if aliases is None:
        map_path = None
    else:
        map_path = functools.lru_cache(maxsize=None)(aliases.map)

    file_hashes = set()
    combined_any = False

    for f in files_to_combine:
        if f == data.data_filename():
            # Sometimes we are combining into a file which is one of the
            # parallel files.  Skip that file.
            if data._debug.should("dataio"):
                data._debug.write(f"Skipping combining ourself: {f!r}")
            continue

        try:
            rel_file_name = os.path.relpath(f)
        except ValueError:
            # ValueError can be raised under Windows when os.getcwd() returns a
            # folder from a different drive than the drive of f, in which case
            # we print the original value of f instead of its relative path
            rel_file_name = f

        with open(f, "rb") as fobj:
            hasher = hashlib.new("sha3_256")
            hasher.update(fobj.read())
            sha = hasher.digest()
            combine_this_one = sha not in file_hashes

        delete_this_one = not keep
        if combine_this_one:
            if data._debug.should("dataio"):
                data._debug.write(f"Combining data file {f!r}")
            file_hashes.add(sha)
            try:
                new_data = CoverageData(f, debug=data._debug)
                new_data.read()
            except CoverageException as exc:
                if data._warn:
                    # The CoverageException has the file name in it, so just
                    # use the message as the warning.
                    data._warn(str(exc))
                if message:
                    message(f"Couldn't combine data file {rel_file_name}: {exc}")
                delete_this_one = False
            else:
                data.update(new_data, map_path=map_path)
                combined_any = True
                if message:
                    message(f"Combined data file {rel_file_name}")
        else:
            if message:
                message(f"Skipping duplicate data {rel_file_name}")

        if delete_this_one:
            if data._debug.should("dataio"):
                data._debug.write(f"Deleting data file {f!r}")
            file_be_gone(f)

    if strict and not combined_any:
        raise NoDataError("No usable data files")


def debug_data_file(filename: str) -> None:
    """Implementation of 'coverage debug data'."""
    data = CoverageData(filename)
    filename = data.data_filename()
    print(f"path: {filename}")
    if not os.path.exists(filename):
        print("No data collected: file doesn't exist")
        return
    data.read()
    print(f"has_arcs: {data.has_arcs()!r}")
    summary = line_counts(data, fullpath=True)
    filenames = human_sorted(summary.keys())
    nfiles = len(filenames)
    print(f"{nfiles} file{plural(nfiles)}:")
    for f in filenames:
        line = f"{f}: {summary[f]} line{plural(summary[f])}"
        plugin = data.file_tracer(f)
        if plugin:
            line += f" [{plugin}]"
        print(line)


def sorted_lines(data: CoverageData, filename: str) -> list[int]:
    """Get the sorted lines for a file, for tests."""
    lines = data.lines(filename)
    return sorted(lines or [])
