# -*- coding: utf-8 -*-
# test_git.py
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
import os
import subprocess
import sys
from tempfile import TemporaryFile
from unittest import mock

from git import Git, refresh, GitCommandError, GitCommandNotFound, Repo, cmd
from test.lib import TestBase, fixture_path
from test.lib import with_rw_directory
from git.util import finalize_process

import os.path as osp

from git.compat import is_win


class TestGit(TestBase):
    @classmethod
    def setUpClass(cls):
        super(TestGit, cls).setUpClass()
        cls.git = Git(cls.rorepo.working_dir)

    def tearDown(self):
        import gc

        gc.collect()

    @mock.patch.object(Git, "execute")
    def test_call_process_calls_execute(self, git):
        git.return_value = ""
        self.git.version()
        self.assertTrue(git.called)
        self.assertEqual(git.call_args, ((["git", "version"],), {}))

    def test_call_unpack_args_unicode(self):
        args = Git._unpack_args("Unicode€™")
        mangled_value = "Unicode\u20ac\u2122"
        self.assertEqual(args, [mangled_value])

    def test_call_unpack_args(self):
        args = Git._unpack_args(["git", "log", "--", "Unicode€™"])
        mangled_value = "Unicode\u20ac\u2122"
        self.assertEqual(args, ["git", "log", "--", mangled_value])

    def test_it_raises_errors(self):
        self.assertRaises(GitCommandError, self.git.this_does_not_exist)

    def test_it_transforms_kwargs_into_git_command_arguments(self):
        self.assertEqual(["-s"], self.git.transform_kwargs(**{"s": True}))
        self.assertEqual(["-s", "5"], self.git.transform_kwargs(**{"s": 5}))
        self.assertEqual([], self.git.transform_kwargs(**{"s": None}))

        self.assertEqual(["--max-count"], self.git.transform_kwargs(**{"max_count": True}))
        self.assertEqual(["--max-count=5"], self.git.transform_kwargs(**{"max_count": 5}))
        self.assertEqual(["--max-count=0"], self.git.transform_kwargs(**{"max_count": 0}))
        self.assertEqual([], self.git.transform_kwargs(**{"max_count": None}))

        # Multiple args are supported by using lists/tuples
        self.assertEqual(
            ["-L", "1-3", "-L", "12-18"],
            self.git.transform_kwargs(**{"L": ("1-3", "12-18")}),
        )
        self.assertEqual(["-C", "-C"], self.git.transform_kwargs(**{"C": [True, True, None, False]}))

        # order is undefined
        res = self.git.transform_kwargs(**{"s": True, "t": True})
        self.assertEqual({"-s", "-t"}, set(res))

    def test_it_executes_git_to_shell_and_returns_result(self):
        self.assertRegex(self.git.execute(["git", "version"]), r"^git version [\d\.]{2}.*$")

    def test_it_accepts_stdin(self):
        filename = fixture_path("cat_file_blob")
        with open(filename, "r") as fh:
            self.assertEqual(
                "70c379b63ffa0795fdbfbc128e5a2818397b7ef8",
                self.git.hash_object(istream=fh, stdin=True),
            )

    @mock.patch.object(Git, "execute")
    def test_it_ignores_false_kwargs(self, git):
        # this_should_not_be_ignored=False implies it *should* be ignored
        self.git.version(pass_this_kwarg=False)
        self.assertTrue("pass_this_kwarg" not in git.call_args[1])

    def test_it_raises_proper_exception_with_output_stream(self):
        tmp_file = TemporaryFile()
        self.assertRaises(
            GitCommandError,
            self.git.checkout,
            "non-existent-branch",
            output_stream=tmp_file,
        )

    def test_it_accepts_environment_variables(self):
        filename = fixture_path("ls_tree_empty")
        with open(filename, "r") as fh:
            tree = self.git.mktree(istream=fh)
            env = {
                "GIT_AUTHOR_NAME": "Author Name",
                "GIT_AUTHOR_EMAIL": "author@example.com",
                "GIT_AUTHOR_DATE": "1400000000+0000",
                "GIT_COMMITTER_NAME": "Committer Name",
                "GIT_COMMITTER_EMAIL": "committer@example.com",
                "GIT_COMMITTER_DATE": "1500000000+0000",
            }
            commit = self.git.commit_tree(tree, m="message", env=env)
            self.assertEqual(commit, "4cfd6b0314682d5a58f80be39850bad1640e9241")

    def test_persistent_cat_file_command(self):
        # read header only
        hexsha = "b2339455342180c7cc1e9bba3e9f181f7baa5167"
        g = self.git.cat_file(batch_check=True, istream=subprocess.PIPE, as_process=True)
        g.stdin.write(b"b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
        g.stdin.flush()
        obj_info = g.stdout.readline()

        # read header + data
        g = self.git.cat_file(batch=True, istream=subprocess.PIPE, as_process=True)
        g.stdin.write(b"b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
        g.stdin.flush()
        obj_info_two = g.stdout.readline()
        self.assertEqual(obj_info, obj_info_two)

        # read data - have to read it in one large chunk
        size = int(obj_info.split()[2])
        g.stdout.read(size)
        g.stdout.read(1)

        # now we should be able to read a new object
        g.stdin.write(b"b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
        g.stdin.flush()
        self.assertEqual(g.stdout.readline(), obj_info)

        # same can be achieved using the respective command functions
        hexsha, typename, size = self.git.get_object_header(hexsha)
        hexsha, typename_two, size_two, _ = self.git.get_object_data(hexsha)
        self.assertEqual(typename, typename_two)
        self.assertEqual(size, size_two)

    def test_version(self):
        v = self.git.version_info
        self.assertIsInstance(v, tuple)
        for n in v:
            self.assertIsInstance(n, int)
        # END verify number types

    def test_cmd_override(self):
        prev_cmd = self.git.GIT_PYTHON_GIT_EXECUTABLE
        exc = GitCommandNotFound
        try:
            # set it to something that doesn't exist, assure it raises
            type(self.git).GIT_PYTHON_GIT_EXECUTABLE = osp.join(
                "some", "path", "which", "doesn't", "exist", "gitbinary"
            )
            self.assertRaises(exc, self.git.version)
        finally:
            type(self.git).GIT_PYTHON_GIT_EXECUTABLE = prev_cmd
        # END undo adjustment

    def test_refresh(self):
        # test a bad git path refresh
        self.assertRaises(GitCommandNotFound, refresh, "yada")

        # test a good path refresh
        which_cmd = "where" if is_win else "which"
        path = os.popen("{0} git".format(which_cmd)).read().strip().split("\n")[0]
        refresh(path)

    def test_options_are_passed_to_git(self):
        # This work because any command after git --version is ignored
        git_version = self.git(version=True).NoOp()
        git_command_version = self.git.version()
        self.assertEqual(git_version, git_command_version)

    def test_persistent_options(self):
        git_command_version = self.git.version()
        # analog to test_options_are_passed_to_git
        self.git.set_persistent_git_options(version=True)
        git_version = self.git.NoOp()
        self.assertEqual(git_version, git_command_version)
        # subsequent calls keep this option:
        git_version_2 = self.git.NoOp()
        self.assertEqual(git_version_2, git_command_version)

        # reset to empty:
        self.git.set_persistent_git_options()
        self.assertRaises(GitCommandError, self.git.NoOp)

    def test_single_char_git_options_are_passed_to_git(self):
        input_value = "TestValue"
        output_value = self.git(c="user.name=%s" % input_value).config("--get", "user.name")
        self.assertEqual(input_value, output_value)

    def test_change_to_transform_kwargs_does_not_break_command_options(self):
        self.git.log(n=1)

    def test_insert_after_kwarg_raises(self):
        # This isn't a complete add command, which doesn't matter here
        self.assertRaises(ValueError, self.git.remote, "add", insert_kwargs_after="foo")

    def test_env_vars_passed_to_git(self):
        editor = "non_existent_editor"
        with mock.patch.dict("os.environ", {"GIT_EDITOR": editor}):  # @UndefinedVariable
            self.assertEqual(self.git.var("GIT_EDITOR"), editor)

    @with_rw_directory
    def test_environment(self, rw_dir):
        # sanity check
        self.assertEqual(self.git.environment(), {})

        # make sure the context manager works and cleans up after itself
        with self.git.custom_environment(PWD="/tmp"):
            self.assertEqual(self.git.environment(), {"PWD": "/tmp"})

        self.assertEqual(self.git.environment(), {})

        old_env = self.git.update_environment(VARKEY="VARVALUE")
        # The returned dict can be used to revert the change, hence why it has
        # an entry with value 'None'.
        self.assertEqual(old_env, {"VARKEY": None})
        self.assertEqual(self.git.environment(), {"VARKEY": "VARVALUE"})

        new_env = self.git.update_environment(**old_env)
        self.assertEqual(new_env, {"VARKEY": "VARVALUE"})
        self.assertEqual(self.git.environment(), {})

        path = osp.join(rw_dir, "failing-script.sh")
        with open(path, "wt") as stream:
            stream.write("#!/usr/bin/env sh\n" "echo FOO\n")
        os.chmod(path, 0o777)

        rw_repo = Repo.init(osp.join(rw_dir, "repo"))
        remote = rw_repo.create_remote("ssh-origin", "ssh://git@server/foo")

        with rw_repo.git.custom_environment(GIT_SSH=path):
            try:
                remote.fetch()
            except GitCommandError as err:
                self.assertIn("FOO", str(err))

    def test_handle_process_output(self):
        from git.cmd import handle_process_output

        line_count = 5002
        count = [None, 0, 0]

        def counter_stdout(line):
            count[1] += 1

        def counter_stderr(line):
            count[2] += 1

        cmdline = [
            sys.executable,
            fixture_path("cat_file.py"),
            str(fixture_path("issue-301_stderr")),
        ]
        proc = subprocess.Popen(
            cmdline,
            stdin=None,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            shell=False,
            creationflags=cmd.PROC_CREATIONFLAGS,
        )

        handle_process_output(proc, counter_stdout, counter_stderr, finalize_process)

        self.assertEqual(count[1], line_count)
        self.assertEqual(count[2], line_count)
