# Copyright 2017 Alethea Katherine Flowers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import argparse
import builtins
import copy
import json
import os
import platform
from textwrap import dedent
from unittest import mock

import pytest

import nox
from nox import _options, sessions, tasks
from nox.manifest import WARN_PYTHONS_IGNORED, Manifest

RESOURCES = os.path.join(os.path.dirname(__file__), "resources")


def session_func():
    pass


session_func.python = None
session_func.venv_backend = None
session_func.should_warn = dict()
session_func.tags = []


def session_func_with_python():
    pass


session_func_with_python.python = "3.8"
session_func_with_python.venv_backend = None


def session_func_venv_pythons_warning():
    pass


session_func_venv_pythons_warning.python = ["3.7"]
session_func_venv_pythons_warning.venv_backend = "none"
session_func_venv_pythons_warning.should_warn = {WARN_PYTHONS_IGNORED: ["3.7"]}


def test_load_nox_module():
    config = _options.options.namespace(noxfile=os.path.join(RESOURCES, "noxfile.py"))
    noxfile_module = tasks.load_nox_module(config)
    assert noxfile_module.SIGIL == "123"


def test_load_nox_module_expandvars():
    # Assert that variables are expanded when looking up the path to the Noxfile
    # This is particular importand in Windows when one needs to use variables like
    # %TEMP% to point to the noxfile.py
    with mock.patch.dict(os.environ, {"RESOURCES_PATH": RESOURCES}):
        if platform.system().lower().startswith("win"):
            config = _options.options.namespace(noxfile="%RESOURCES_PATH%/noxfile.py")
        else:
            config = _options.options.namespace(noxfile="${RESOURCES_PATH}/noxfile.py")
        noxfile_module = tasks.load_nox_module(config)
    assert noxfile_module.__file__ == os.path.join(RESOURCES, "noxfile.py")
    assert noxfile_module.SIGIL == "123"


def test_load_nox_module_not_found(caplog, tmp_path):
    bogus_noxfile = tmp_path / "bogus.py"
    config = _options.options.namespace(noxfile=str(bogus_noxfile))

    assert tasks.load_nox_module(config) == 2
    assert (
        f"Failed to load Noxfile {bogus_noxfile}, no such file exists." in caplog.text
    )


def test_load_nox_module_os_error(caplog):
    noxfile = os.path.join(RESOURCES, "noxfile.py")
    config = _options.options.namespace(noxfile=noxfile)
    with mock.patch("nox.tasks.check_nox_version", autospec=True) as version_checker:
        version_checker.side_effect = OSError
        assert tasks.load_nox_module(config) == 2
        assert f"Failed to load Noxfile {noxfile}" in caplog.text


@pytest.fixture
def reset_needs_version():
    """Do not leak ``nox.needs_version`` between tests."""
    try:
        yield
    finally:
        nox.needs_version = None


def test_load_nox_module_needs_version_static(reset_needs_version, tmp_path):
    text = dedent(
        """
        import nox
        nox.needs_version = ">=9999.99.99"
        """
    )
    noxfile = tmp_path / "noxfile.py"
    noxfile.write_text(text)
    config = _options.options.namespace(noxfile=str(noxfile))
    assert tasks.load_nox_module(config) == 2


def test_load_nox_module_needs_version_dynamic(reset_needs_version, tmp_path):
    text = dedent(
        """
        import nox
        NOX_NEEDS_VERSION = ">=9999.99.99"
        nox.needs_version = NOX_NEEDS_VERSION
        """
    )
    noxfile = tmp_path / "noxfile.py"
    noxfile.write_text(text)
    config = _options.options.namespace(noxfile=str(noxfile))
    tasks.load_nox_module(config)
    # Dynamic version requirements are not checked.
    assert nox.needs_version == ">=9999.99.99"


def test_discover_session_functions_decorator():
    # Define sessions using the decorator.
    @nox.session
    def foo():
        pass

    @nox.session
    def bar():
        pass

    @nox.session(name="not-a-bar")
    def not_a_bar():
        pass

    def notasession():
        pass

    # Mock up a noxfile.py module and configuration.
    mock_module = argparse.Namespace(
        __name__=foo.__module__, foo=foo, bar=bar, notasession=notasession
    )
    config = _options.options.namespace(sessions=(), keywords=(), posargs=[])

    # Get the manifest and establish that it looks like what we expect.
    manifest = tasks.discover_manifest(mock_module, config)
    sessions = list(manifest)
    assert [s.func for s in sessions] == [foo, bar, not_a_bar]
    assert [i.friendly_name for i in sessions] == ["foo", "bar", "not-a-bar"]


def test_filter_manifest():
    config = _options.options.namespace(
        sessions=None, pythons=(), keywords=(), posargs=[]
    )
    manifest = Manifest({"foo": session_func, "bar": session_func}, config)
    return_value = tasks.filter_manifest(manifest, config)
    assert return_value is manifest
    assert len(manifest) == 2


def test_filter_manifest_not_found():
    config = _options.options.namespace(
        sessions=("baz",), pythons=(), keywords=(), posargs=[]
    )
    manifest = Manifest({"foo": session_func, "bar": session_func}, config)
    return_value = tasks.filter_manifest(manifest, config)
    assert return_value == 3


def test_filter_manifest_pythons():
    config = _options.options.namespace(
        sessions=None, pythons=("3.8",), keywords=(), posargs=[]
    )
    manifest = Manifest(
        {"foo": session_func_with_python, "bar": session_func, "baz": session_func},
        config,
    )
    return_value = tasks.filter_manifest(manifest, config)
    assert return_value is manifest
    assert len(manifest) == 1


def test_filter_manifest_pythons_not_found(caplog):
    config = _options.options.namespace(
        sessions=None, pythons=("1.2",), keywords=(), posargs=[]
    )
    manifest = Manifest(
        {"foo": session_func_with_python, "bar": session_func, "baz": session_func},
        config,
    )
    return_value = tasks.filter_manifest(manifest, config)
    assert return_value == 3
    assert "Python version selection caused no sessions to be selected." in caplog.text


def test_filter_manifest_keywords():
    config = _options.options.namespace(
        sessions=None, pythons=(), keywords="foo or bar", posargs=[]
    )
    manifest = Manifest(
        {"foo": session_func, "bar": session_func, "baz": session_func}, config
    )
    return_value = tasks.filter_manifest(manifest, config)
    assert return_value is manifest
    assert len(manifest) == 2


def test_filter_manifest_keywords_not_found(caplog):
    config = _options.options.namespace(
        sessions=None, pythons=(), keywords="mouse or python", posargs=[]
    )
    manifest = Manifest(
        {"foo": session_func, "bar": session_func, "baz": session_func}, config
    )
    return_value = tasks.filter_manifest(manifest, config)
    assert return_value == 3
    assert "No sessions selected after filtering by keyword." in caplog.text


def test_filter_manifest_keywords_syntax_error():
    config = _options.options.namespace(
        sessions=None, pythons=(), keywords="foo:bar", posargs=[]
    )
    manifest = Manifest({"foo_bar": session_func, "foo_baz": session_func}, config)
    return_value = tasks.filter_manifest(manifest, config)
    assert return_value == 3


@pytest.mark.parametrize(
    "tags,session_count",
    [
        (None, 4),
        (["foo"], 3),
        (["bar"], 3),
        (["baz"], 1),
        (["foo", "bar"], 4),
        (["foo", "baz"], 3),
        (["foo", "bar", "baz"], 4),
    ],
)
def test_filter_manifest_tags(tags, session_count):
    @nox.session(tags=["foo"])
    def qux():
        pass

    @nox.session(tags=["bar"])
    def quux():
        pass

    @nox.session(tags=["foo", "bar"])
    def quuz():
        pass

    @nox.session(tags=["foo", "bar", "baz"])
    def corge():
        pass

    config = _options.options.namespace(
        sessions=None, pythons=(), posargs=[], tags=tags
    )
    manifest = Manifest(
        {
            "qux": qux,
            "quux": quux,
            "quuz": quuz,
            "corge": corge,
        },
        config,
    )
    return_value = tasks.filter_manifest(manifest, config)
    assert return_value is manifest
    assert len(manifest) == session_count


@pytest.mark.parametrize(
    "tags",
    [
        ["Foo"],
        ["not-found"],
    ],
    ids=[
        "tags-are-case-insensitive",
        "tag-does-not-exist",
    ],
)
def test_filter_manifest_tags_not_found(tags, caplog):
    @nox.session(tags=["foo"])
    def quux():
        pass

    config = _options.options.namespace(
        sessions=None, pythons=(), posargs=[], tags=tags
    )
    manifest = Manifest({"quux": quux}, config)
    return_value = tasks.filter_manifest(manifest, config)
    assert return_value == 3
    assert "Tag selection caused no sessions to be selected." in caplog.text


def test_honor_list_request_noop():
    config = _options.options.namespace(list_sessions=False)
    manifest = {"thing": mock.sentinel.THING}
    return_value = tasks.honor_list_request(manifest, global_config=config)
    assert return_value is manifest


@pytest.mark.parametrize(
    "description, module_docstring",
    [
        (None, None),
        (None, "hello docstring"),
        ("Bar", None),
        ("Bar", "hello docstring"),
    ],
)
def test_honor_list_request(description, module_docstring):
    config = _options.options.namespace(
        list_sessions=True, noxfile="noxfile.py", color=False
    )
    manifest = mock.create_autospec(Manifest)
    manifest.module_docstring = module_docstring
    manifest.list_all_sessions.return_value = [
        (argparse.Namespace(friendly_name="foo", description=description), True)
    ]
    return_value = tasks.honor_list_request(manifest, global_config=config)
    assert return_value == 0


def test_honor_list_request_skip_and_selected(capsys):
    config = _options.options.namespace(
        list_sessions=True, noxfile="noxfile.py", color=False
    )
    manifest = mock.create_autospec(Manifest)
    manifest.module_docstring = None
    manifest.list_all_sessions.return_value = [
        (argparse.Namespace(friendly_name="foo", description=None), True),
        (argparse.Namespace(friendly_name="bar", description=None), False),
    ]
    return_value = tasks.honor_list_request(manifest, global_config=config)
    assert return_value == 0

    out = capsys.readouterr().out

    assert "* foo" in out
    assert "- bar" in out


def test_honor_list_request_prints_docstring_if_present(capsys):
    config = _options.options.namespace(
        list_sessions=True, noxfile="noxfile.py", color=False
    )
    manifest = mock.create_autospec(Manifest)
    manifest.module_docstring = "Hello I'm a docstring"
    manifest.list_all_sessions.return_value = [
        (argparse.Namespace(friendly_name="foo", description=None), True),
        (argparse.Namespace(friendly_name="bar", description=None), False),
    ]

    return_value = tasks.honor_list_request(manifest, global_config=config)
    assert return_value == 0

    out = capsys.readouterr().out

    assert "Hello I'm a docstring" in out


def test_honor_list_request_doesnt_print_docstring_if_not_present(capsys):
    config = _options.options.namespace(
        list_sessions=True, noxfile="noxfile.py", color=False
    )
    manifest = mock.create_autospec(Manifest)
    manifest.module_docstring = None
    manifest.list_all_sessions.return_value = [
        (argparse.Namespace(friendly_name="foo", description=None), True),
        (argparse.Namespace(friendly_name="bar", description=None), False),
    ]

    return_value = tasks.honor_list_request(manifest, global_config=config)
    assert return_value == 0

    out = capsys.readouterr().out

    assert "Hello I'm a docstring" not in out


def test_empty_session_list_in_noxfile(capsys):
    config = _options.options.namespace(noxfile="noxfile.py", sessions=(), posargs=[])
    manifest = Manifest({"session": session_func}, config)
    return_value = tasks.filter_manifest(manifest, global_config=config)
    assert return_value == 0
    assert "No sessions selected." in capsys.readouterr().out


def test_empty_session_None_in_noxfile(capsys):
    config = _options.options.namespace(noxfile="noxfile.py", sessions=None, posargs=[])
    manifest = Manifest({"session": session_func}, config)
    return_value = tasks.filter_manifest(manifest, global_config=config)
    assert return_value == manifest


def test_verify_manifest_empty():
    config = _options.options.namespace(sessions=(), keywords=())
    manifest = Manifest({}, config)
    return_value = tasks.filter_manifest(manifest, global_config=config)
    assert return_value == 3


def test_verify_manifest_nonempty():
    config = _options.options.namespace(sessions=None, keywords=(), posargs=[])
    manifest = Manifest({"session": session_func}, config)
    return_value = tasks.filter_manifest(manifest, global_config=config)
    assert return_value == manifest


def test_verify_manifest_list(capsys):
    config = _options.options.namespace(sessions=(), keywords=(), posargs=[])
    manifest = Manifest({"session": session_func}, config)
    return_value = tasks.filter_manifest(manifest, global_config=config)
    assert return_value == 0
    assert "Please select a session" in capsys.readouterr().out


@pytest.mark.parametrize("with_warnings", [False, True], ids="with_warnings={}".format)
def test_run_manifest(with_warnings):
    # Set up a valid manifest.
    config = _options.options.namespace(stop_on_first_error=False)
    sessions_ = [
        mock.Mock(spec=sessions.SessionRunner),
        mock.Mock(spec=sessions.SessionRunner),
    ]
    manifest = Manifest({}, config)
    manifest._queue = copy.copy(sessions_)

    # Ensure each of the mocks returns a successful result
    for mock_session in sessions_:
        mock_session.execute.return_value = sessions.Result(
            session=mock_session, status=sessions.Status.SUCCESS
        )
        # we need the should_warn attribute, add some func
        if with_warnings:
            mock_session.name = "hello"
            mock_session.func = session_func_venv_pythons_warning
        else:
            mock_session.func = session_func

    # Run the manifest.
    results = tasks.run_manifest(manifest, global_config=config)

    # Verify the results look correct.
    assert len(results) == 2
    assert results[0].session == sessions_[0]
    assert results[1].session == sessions_[1]
    for result in results:
        assert isinstance(result, sessions.Result)
        assert result.status == sessions.Status.SUCCESS


def test_run_manifest_abort_on_first_failure():
    # Set up a valid manifest.
    config = _options.options.namespace(stop_on_first_error=True)
    sessions_ = [
        mock.Mock(spec=sessions.SessionRunner),
        mock.Mock(spec=sessions.SessionRunner),
    ]
    manifest = Manifest({}, config)
    manifest._queue = copy.copy(sessions_)

    # Ensure each of the mocks returns a successful result.
    for mock_session in sessions_:
        mock_session.execute.return_value = sessions.Result(
            session=mock_session, status=sessions.Status.FAILED
        )
        # we need the should_warn attribute, add some func
        mock_session.func = session_func

    # Run the manifest.
    results = tasks.run_manifest(manifest, global_config=config)

    # Verify the results look correct.
    assert len(results) == 1
    assert isinstance(results[0], sessions.Result)
    assert results[0].session == sessions_[0]
    assert results[0].status == sessions.Status.FAILED

    # Verify that only the first session was called.
    assert sessions_[0].execute.called
    assert not sessions_[1].execute.called


def test_print_summary_one_result():
    results = [mock.sentinel.RESULT]
    with mock.patch("nox.tasks.logger", autospec=True) as logger:
        answer = tasks.print_summary(results, object())
        assert not logger.warning.called
        assert not logger.success.called
        assert not logger.error.called
    assert answer is results


def test_print_summary():
    results = [
        sessions.Result(
            session=argparse.Namespace(friendly_name="foo"),
            status=sessions.Status.SUCCESS,
        ),
        sessions.Result(
            session=argparse.Namespace(friendly_name="bar"),
            status=sessions.Status.FAILED,
        ),
    ]
    with mock.patch.object(sessions.Result, "log", autospec=True) as log:
        answer = tasks.print_summary(results, object())
        assert log.call_count == 2
    assert answer is results


def test_create_report_noop():
    config = _options.options.namespace(report=None)
    with mock.patch.object(builtins, "open", autospec=True) as open_:
        results = tasks.create_report(mock.sentinel.RESULTS, config)
        assert not open_.called
    assert results is mock.sentinel.RESULTS


def test_create_report():
    config = _options.options.namespace(report="/path/to/report")
    results = [
        sessions.Result(
            session=argparse.Namespace(
                signatures=["foosig"], name="foo", func=object()
            ),
            status=sessions.Status.SUCCESS,
        )
    ]
    with mock.patch.object(builtins, "open", autospec=True) as open_:
        with mock.patch.object(json, "dump", autospec=True) as dump:
            answer = tasks.create_report(results, config)
            assert answer is results
            dump.assert_called_once_with(
                {
                    "result": 1,
                    "sessions": [
                        {
                            "name": "foo",
                            "signatures": ["foosig"],
                            "result": "success",
                            "result_code": 1,
                            "args": {},
                        }
                    ],
                },
                mock.ANY,
                indent=2,
            )
        open_.assert_called_once_with("/path/to/report", "w")


def test_final_reduce():
    config = object()
    assert tasks.final_reduce([True, True], config) == 0
    assert tasks.final_reduce([True, False], config) == 1
    assert tasks.final_reduce([], config) == 0
