# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.

import inspect
import os
import py
import pytest
import sys
import threading
import types

from debugpy.common import log, timestamp
import tests
from tests import code, logs
from tests.debug import runners, session, targets

# Set up the test matrix for various code types and attach methods

if tests.full:
    TARGETS = targets.all_named
    RUNNERS = runners.all_launch + runners.all_attach_socket
else:
    TARGETS = [targets.Program]
    RUNNERS = [runners.launch]


@pytest.fixture(params=TARGETS)
def target(request):
    return request.param


@pytest.fixture(params=RUNNERS)
def run(request):
    return request.param


@pytest.fixture(autouse=True)
def test_wrapper(request, long_tmpdir):
    def write_log(filename, data):
        filename = os.path.join(log.log_dir, filename)
        if not isinstance(data, bytes):
            data = data.encode("utf-8")
        with open(filename, "wb") as f:
            f.write(data)

    session.Session.reset_counter()

    # Add worker-specific isolation for tmpdir and log directory
    try:
        worker_id = os.environ.get("PYTEST_XDIST_WORKER", "gw0")
        worker_suffix = f"_{worker_id}"
    except Exception:
        worker_suffix = ""

    session.Session.tmpdir = long_tmpdir / f"session{worker_suffix}"
    session.Session.tmpdir.ensure(dir=True)
    original_log_dir = log.log_dir

    failed = True
    try:
        if log.log_dir is None:
            log.log_dir = (long_tmpdir / f"debugpy_logs{worker_suffix}").strpath
        else:
            log_subdir = request.node.nodeid
            log_subdir = log_subdir.replace("::", "/")
            for ch in r":?*|<>":
                log_subdir = log_subdir.replace(ch, f"&#{ord(ch)};")
            log.log_dir += "/" + log_subdir + worker_suffix

        try:
            py.path.local(log.log_dir).remove()
        except Exception:
            pass

        print("\n")  # make sure on-screen logs start on a new line
        with log.to_file(prefix="tests"):
            timestamp.reset()
            log.info("{0} started.", request.node.nodeid)
            log.describe_environment("Environment:")
            try:
                yield
            finally:
                failed = False
                for report_attr in ("setup_report", "call_report", "teardown_report"):
                    try:
                        report = getattr(request.node, report_attr)
                    except AttributeError:
                        continue

                    failed |= report.failed
                    log.write_format(
                        "error" if report.failed else "info",
                        "pytest {0} phase for {1} {2}.",
                        report.when,
                        request.node.nodeid,
                        report.outcome,
                    )

                    write_log(report_attr + ".log", report.longreprtext)
                    write_log(report_attr + ".stdout.log", report.capstdout)
                    write_log(report_attr + ".stderr.log", report.capstderr)

                if failed:
                    write_log("FAILED.log", "")
                    logs.dump()

    finally:
        if not failed and not request.config.option.debugpy_log_passed:
            # try:
            #     py.path.local(log.log_dir).remove()
            # except Exception:
            #     pass
            pass
        log.log_dir = original_log_dir


@pytest.fixture
def daemon(request):
    """Provides a factory function for daemon threads. The returned thread is
    started immediately, and it must not be alive by the time the test returns.
    """

    daemons = []

    def factory(func, name_suffix=""):
        name = func.__name__ + name_suffix
        thread = threading.Thread(target=func, name=name)
        thread.daemon = True
        daemons.append(thread)
        thread.start()
        return thread

    yield factory

    try:
        failed = request.node.call_report.failed
    except AttributeError:
        pass
    else:
        if not failed:
            for thread in daemons:
                assert not thread.is_alive()


if sys.platform != "win32":

    @pytest.fixture
    def long_tmpdir(request, tmpdir):
        return tmpdir

else:
    import ctypes

    GetLongPathNameW = ctypes.windll.kernel32.GetLongPathNameW
    GetLongPathNameW.argtypes = [ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint32]
    GetLongPathNameW.restype = ctypes.c_uint32

    @pytest.fixture
    def long_tmpdir(request, tmpdir):
        """Like tmpdir, but ensures that it's a long rather than short filename on Win32."""
        path = tmpdir.strpath
        buffer = ctypes.create_unicode_buffer(512)
        if GetLongPathNameW(path, buffer, len(buffer)):
            path = buffer.value
        return py.path.local(path)


@pytest.fixture
def pyfile(request, long_tmpdir):
    """A fixture providing a factory function that generates .py files.

    The returned factory takes a single function with an empty argument list,
    generates a temporary file that contains the code corresponding to the
    function body, and returns the full path to the generated file. Idiomatic
    use is as a decorator, e.g.:

        @pyfile
        def script_file():
            print('fizz')
            print('buzz')

    will produce a temporary file named script_file.py containing:

        print('fizz')
        print('buzz')

    and the variable script_file will contain the path to that file.

    In order for the factory to be able to extract the function body properly,
    function header ("def") must all be on a single line, with nothing after
    the colon but whitespace.

    Note that because the code is physically in a separate file when it runs,
    it cannot reuse top-level module imports - it must import all the modules
    that it uses locally. When linter complains, use #noqa.

    Returns a py.path.local instance that has the additional attribute "lines".
    After the source is writen to disk, tests.code.get_marked_line_numbers() is
    invoked on the resulting file to compute the value of that attribute.
    """

    def factory(source):
        assert isinstance(source, types.FunctionType)
        name = source.__name__
        source, _ = inspect.getsourcelines(source)

        # First, find the "def" line.
        def_lineno = 0
        for line in source:
            line = line.strip()
            if line.startswith("def") and line.endswith(":"):
                break
            def_lineno += 1
        else:
            raise ValueError("Failed to locate function header.")

        # Remove everything up to and including "def".
        source = source[def_lineno + 1 :]
        assert source

        # Now we need to adjust indentation. Compute how much the first line of
        # the body is indented by, then dedent all lines by that amount. Blank
        # lines don't matter indentation-wise, and might not be indented to begin
        # with, so just replace them with a simple newline.
        line = source[0]
        indent = len(line) - len(line.lstrip())
        source = [s[indent:] if s.strip() else "\n" for s in source]
        source = "".join(source)

        # Add a sleep at the end so that the program doesn't exit before we can handle all of the messages it sent
        source += "\nimport time\ntime.sleep(2)\n"

        # Write it to file.
        tmpfile = long_tmpdir / (name + ".py")
        tmpfile.strpath = tmpfile.strpath
        assert not tmpfile.check()
        tmpfile.write(source)

        tmpfile.lines = code.get_marked_line_numbers(tmpfile)
        return tmpfile

    return factory
