import copy
import datetime
import itertools
import os
import pkg_resources
import shutil
import tempfile
import unittest
import unittest.mock

import pygit2
import pytest

import gitubuntu.git_repository as target
from gitubuntu.git_repository import HeadInfoItem
from gitubuntu.repo_builder import (
    Blob,
    Commit,
    Placeholder,
    Repo,
    SourceTree,
    Symlink,
    Tree,
)
from gitubuntu.source_builder import Source, SourceSpec
import gitubuntu.spec
from gitubuntu.test_fixtures import (
    repo,
    pygit2_repo,
)
from gitubuntu.test_util import get_test_changelog


@pytest.mark.parametrize('same_remote_branch_names, different_remote_branch_names, expected', [
    ([], [], ''),
    (['pkg/ubuntu/xenial-devel',], [], 'pkg/ubuntu/xenial-devel'),
    (['pkg/ubuntu/xenial-security',], [], 'pkg/ubuntu/xenial-security'),
    (['pkg/ubuntu/xenial-updates', 'pkg/ubuntu/xenial-devel'], [],
        'pkg/ubuntu/xenial-devel'
    ),
    ([], ['pkg/ubuntu/xenial-updates', 'pkg/ubuntu/xenial-devel'],
        ''
    ),
    (['pkg/ubuntu/zesty-devel', 'pkg/ubuntu/zesty-proposed', 'pkg/ubuntu/devel'], [], 'pkg/ubuntu/devel'),
])
def test__derive_target_branch_string(same_remote_branch_names,
    different_remote_branch_names, expected
):
    remote_branch_objects = []
    for branch_name in same_remote_branch_names:
        b = unittest.mock.Mock()
        b.peel(pygit2.Tree).id = unittest.mock.sentinel.same_id
        b.branch_name = branch_name
        remote_branch_objects.append(b)
    for branch_name in different_remote_branch_names:
        b = unittest.mock.Mock()
        b.peel(pygit2.Tree).id = object() # need a different sentinel for each
        b.branch_name = branch_name
        remote_branch_objects.append(b)
    target_branch_string = target._derive_target_branch_string(
        remote_branch_objects
    )
    assert target_branch_string == expected


@pytest.mark.parametrize('changelog_name, expected', [
    ('test_versions_1', ['1.0', None]),
    ('test_versions_2', ['2.0', '1.0']),
    ('test_versions_3', ['4.0', '3.0']),
    ('test_versions_unknown', ['ss-970814-1', None]),
])
def test_changelog_versions(changelog_name, expected):
    test_changelog = get_test_changelog(changelog_name)
    assert [test_changelog.version, test_changelog.previous_version] == expected


@pytest.mark.parametrize('changelog_name, expected', [
    ('test_versions_unknown', ['ss-970814-1',]),
])
def test_changelog_all_versions(changelog_name, expected):
    test_changelog = get_test_changelog(changelog_name)
    assert test_changelog.all_versions == expected


def test_changelog_distribution():
    test_changelog = get_test_changelog('test_distribution')
    assert test_changelog.distribution == 'xenial'


def test_changelog_date():
    test_changelog = get_test_changelog('test_date_1')
    assert test_changelog.date == 'Mon, 12 May 2016 08:14:34 -0700'
    test_changelog = get_test_changelog('test_date_2')
    assert test_changelog.date == 'Mon, 12 May 2016 08:14:34 -0700'


@pytest.mark.parametrize('changelog_name, expected', [
    ('test_maintainer_1', 'Test Maintainer <test-maintainer@donotmail.com>'),
    ('test_maintainer_2', '<test-maintainer@donotmail.com>'),
])
def test_changelog_maintainer(changelog_name, expected):
    test_changelog = get_test_changelog(changelog_name)
    assert test_changelog.maintainer == expected


def test_changelog_maintainer_invalid():
    with pytest.raises(ValueError):
        test_changelog = get_test_changelog('test_maintainer_3')
        test_changelog.maintainer


def test_changelog_multiple_angle_brackets():
    """An email address with extra angle brackets should still parse"""
    test_changelog = get_test_changelog('test_multiple_angle_brackets')
    assert test_changelog.git_authorship()[1] == 'micah@debian.org'


@pytest.mark.parametrize(['input_date_string', 'expected_result'], [
    # The normal complete form
    ('Mon, 12 May 2016 08:14:34 -0700', (2016, 5, 12, 8, 14, 34, -7)),
    # Day of week missing, such as in:
    #     datefudge 1.12
    ('12 May 2016 08:14:34 -0700', (2016, 5, 12, 8, 14, 34, -7)),
    # Full (not abbreviated) month name, such as in:
    #     dnsmasq 2.32-2
    #     dropbear 0.42-1
    #     e2fsprogs 1.42.11-1
    #     efibootmgr 0.5.4-7
    #     hunspell-br 0.11-1
    #     kubuntu-default-settings 1:6.06-22
    #     libvformat 1.13-4
    ('12 June 2016 08:14:34 -0700', (2016, 6, 12, 8, 14, 34, -7)),
    # Full (not abbreviated) day of week name, such as in:
    #     logcheck 1.2.22a
    ('Thursday, 15 May 2016 08:14:34 -0700', (2016, 5, 15, 8, 14, 34, -7)),
    # Part-abbreviated day of week name, such as in:
    #     kubuntu-meta 1.76
    ('Thur, 15 May 2016 08:14:34 -0700', (2016, 5, 15, 8, 14, 34, -7)),
    #     apachetop 0.12.5-7
    ('Thurs, 12 Jan 2006 12:09:58 +0000', (2006, 1, 12, 12, 9, 58, 0)),
    #     easychem 0.6-0ubuntu1
    ('Tues, 20 Dec 2005 16:57:16 -0500', (2005, 12, 20, 16, 57, 16, -5)),
    # Part-abbreviated month name, such as in:
    #     libapp-cache-perl 0.35-1
    ('Wed, 24 Sept 2008 19:32:23 +0200', (2008, 9, 24, 19, 32, 23, 2)),
    #     libcrypt-hcesha-perl 0.70-2
    ('Wed,  24 Sept 2008 19:44:01 +0200', (2008, 9, 24, 19, 44, 1, 2)),
    #     gnome-shell-extension-tilix-shortcut 1.0.1-1
    ('Tue, 19 Sept 2017 14:23:17 +0200', (2017, 9, 19, 14, 23, 17, 2)),
])
def test_parse_changelog_date(input_date_string, expected_result):
    """_parse_changelog_date should parse a basic date string correctly

    :param str input_date_string: the timestamp part of the changelog signoff
        line
    :param tuple(int, int, int, int, int, int, int) expected_result: the
        expected parse result in (year, month, day, hour, minute, second,
        timezone_offset_in_hours) form. The actual expected result needs to be
        a datetime.datetime object; to avoid duplication in test parameters
        this will be instantiated within the test.
    """
    actual_result = target.Changelog._parse_changelog_date(input_date_string)
    expected_result_datetime = datetime.datetime(
        *expected_result[:6],
        tzinfo=datetime.timezone(datetime.timedelta(hours=expected_result[6])),
    )
    assert actual_result == expected_result_datetime


@pytest.mark.parametrize(['input_date_string'], [
    ('Mon, 30 Feb 2020 15:50:58 +0200',),  # ghostscript 9.50~dfsg-5ubuntu4
    ('Mon, 03 Sep 2018 00:43:25 -7000',),  # lxqt-config 0.13.0-0ubuntu4
    ('Tue, 17 May 2008 10:93:55 -0500',),  # iscsitarget
                                           #     0.4.15+svn148-2.1ubuntu1
    ('Monu, 22 Jan 2007 22:10:50 -0500',), # mail-spf-perl 2.004-0ubuntu1
    ('Wed, 29 Augl 2007 16:14:11 +0200',), # nut 2.2.0-2
])
def test_changelog_date_parse_errors(input_date_string):
    """_parse_changelog_date should raise ValueError on illegal dates

    :param str input_date_string: the timestamp part of the changelog signoff
        line
    """
    with pytest.raises(ValueError):
        target.Changelog._parse_changelog_date(input_date_string)


@pytest.mark.parametrize(
    'changelog_name, name, email, epoch_seconds, offset', [
        (
            'test_maintainer_1',
            'Test Maintainer',
            'test-maintainer@donotmail.com',
            0,
            0,
        ),
        (
            'test_maintainer_2',
            'Unnamed',  # git won't handle empty names; see the spec
            'test-maintainer@donotmail.com',
            0,
            0,
        ),
        (
            'test_date_1',
            'Test Maintainer',
            'test-maintainer@donotmail.com',
            1463066074,
            -420,
        ),
        (
            'test_date_2',
            'Test Maintainer',
            'test-maintainer@donotmail.com',
            1463066074,
            -420,
        ),
        (
            'maintainer_name_leading_space',
            'Test Maintainer',
            'test-maintainer@example.com',
            0,
            0,
        ),
        (
            'maintainer_name_trailing_space',
            'Test Maintainer',
            'test-maintainer@example.com',
            0,
            0,
        ),
        (
            'maintainer_name_inner_space',
            'Test  Maintainer',
            'test-maintainer@example.com',
            0,
            0,
        ),
])
def test_changelog_authorship(
    changelog_name,
    name,
    email,
    epoch_seconds,
    offset,
):
    result = get_test_changelog(changelog_name).git_authorship()
    assert result == (name, email, epoch_seconds, offset)


def test_changelog_utf8():
    test_changelog = get_test_changelog('test_utf8_error')
    assert test_changelog.version == '1.0.3-2'


def test_changelog_duplicate():
    # Changelog.all_versions should successfully return without an assertion

    # Xenial's dpkg-parsechangelog eliminates duplicate versions. Bionic's
    # dpkg-parsechangelog does not. We rely on the behaviour of
    # dpkg-parsechangelog from Bionic, where this test passes. The test fails
    # when using Xenial's dpkg-parsechangelog, where its behaviour doesn't
    # match our assumptions elsewhere.

    # -with-extra includes an extra changelog entry at the end. This is
    # currently needed to trip the assertion because it truncates the longer
    # list before its comparison. This will get fixed in a subsequent commit,
    # but using it here ensures that this test will correctly trip regardless
    # of the presence of that unrelated bug.
    test_changelog = get_test_changelog('duplicate-version-with-extra')
    test_changelog.all_versions


def test_changelog_all_versions_assertion_mismatched_length():
    # if Changelog.all_versions finds that self._changelog.versions mismatches
    # self._shell_all_versions, it is supposed to raise an assertion. Here is
    # an edge case where at one point in development it did not. We fake both
    # _changelog.versions and _shell_all_versions to an edge case where they
    # mismatch.
    with unittest.mock.patch(
        'gitubuntu.git_repository.Changelog._shell_all_versions',
        new_callable=unittest.mock.PropertyMock
    ) as mock_shell_all_versions:
        mock_shell_all_versions.return_value = ['a']
        test_changelog = target.Changelog(b'')
        test_changelog._changelog = unittest.mock.Mock()
        test_changelog._changelog.versions = ['a', 'b']
        with pytest.raises(target.ChangelogError):
            test_changelog.all_versions


@pytest.mark.parametrize('tree_func', [
    # The tree_func parameter is a function that accepts a mock Blob that is to
    # represent the changelog blob itself and returns a mock Tree with the mock
    # Blob embedded somewhere within it. The test function can then ensure that
    # follow_symlinks_to_blob can correctly find the changelog Blob given the
    # Tree.

    # Of course this is only expected to work if, after checking out the Tree,
    # "cat debian/changelog" would work. But this allows us to test the various
    # permutations of symlink following in Trees that _are_ valid.

    # Simple case
    lambda b: Tree({
        'debian': Tree({'changelog': b}),
    }),

    # Symlink in debian/
    lambda b: Tree({
        'debian': Tree({
                    'changelog.real': b,
                    'changelog': Symlink('changelog.real'),
                }),
    }),

    # Symlink to parent directory
    lambda b: Tree({
        'changelog': b,
        'debian': Tree({
            'changelog': Symlink('../changelog'),
        })
    }),

    # Symlink to subdirectory
    lambda b: Tree({
        'debian': Tree({
            'changelog': Symlink('subdirectory/changelog'),
            'subdirectory': Tree({'changelog': b}),
        })
    }),

    # debian/ itself is a symlink to a different directory
    lambda b: Tree({
        'pkg': Tree({'changelog': b}),
        'debian': Symlink('pkg'),
    })
])
def test_follow_symlinks_to_blob(pygit2_repo, tree_func):
    blob = Blob(b'')
    blob_id = blob.write(pygit2_repo)
    tree = pygit2_repo.get(tree_func(blob).write(pygit2_repo))
    result_blob = target.follow_symlinks_to_blob(
        pygit2_repo,
        tree,
        'debian/changelog',
    )
    assert result_blob.id == blob_id


@pytest.mark.parametrize('tree', [
    Tree({}),
    Tree({'debian': Tree({})}),
    Tree({'debian': Tree({'changelog': Symlink('other')})}),
    Tree({'debian': Tree({'changelog': Symlink('../other')})}),
])
def test_follow_symlinks_to_blob_not_found(pygit2_repo, tree):
    pygit2_tree = pygit2_repo.get(tree.write(pygit2_repo))
    with pytest.raises(KeyError):
        target.follow_symlinks_to_blob(
            pygit2_repo,
            pygit2_tree,
            'debian/changelog',
        )


def test_renameable_dir_basename(tmpdir):
    p = tmpdir.join('foo')
    p.ensure()
    rd = target.RenameableDir(str(p))
    assert rd.basename == 'foo'


def test_renameable_dir_basename_setter(tmpdir):
    p = tmpdir.join('foo')
    p.ensure()
    rd = target.RenameableDir(str(p))
    rd.basename = 'bar'
    assert rd.basename == 'bar'
    assert tmpdir.join('bar').check()


def test_dot_git_match(tmpdir):
    for name in ['.git', 'git', '..git', 'other']:
        tmpdir.join(name).ensure()

    result = set(
        x.basename
        for x in tmpdir.listdir(
            fil=lambda x: target._dot_git_match(str(x.basename))
        )
    )
    assert result == set(['.git', '..git'])


def test_renameable_dir_listdir(tmpdir):
    for name in ['.git', 'git', '..git', 'other']:
        tmpdir.join(name).ensure()
    rd = target.RenameableDir(str(tmpdir))
    result = set(rd.listdir(target._dot_git_match))
    assert result == set([
        target.RenameableDir(os.path.join(str(tmpdir), '.git')),
        target.RenameableDir(os.path.join(str(tmpdir), '..git')),
    ])


def test_renamable_dir_recursive(tmpdir):
    a = tmpdir.join('foo')
    a.ensure_dir()
    b = tmpdir.join('bar')
    b.ensure()
    assert target.RenameableDir(str(a)).recursive
    assert not target.RenameableDir(str(b)).recursive


def test_renameable_dir_recursive_symlink_directory(tmpdir):
    """A RenameableDir should not treat a broken symlink as recursive"""
    test_symlink = tmpdir.join('foo')
    nonexistent_file = tmpdir.join('nonexistent_file')
    test_symlink.mksymlinkto(nonexistent_file)
    assert not target.RenameableDir(str(test_symlink)).recursive


def test_renameable_dir_str(tmpdir):
    p = tmpdir.join('foo')
    p.ensure()
    rd = target.RenameableDir(str(p))
    assert str(rd) == os.path.join(str(tmpdir), 'foo')


def test_renameable_dir_repr(tmpdir):
    p = tmpdir.join('foo')
    p.ensure()
    rd = target.RenameableDir(str(p))
    assert repr(rd) == ("RenameableDir('%s/foo')" % str(tmpdir))


def test_renameable_dir_hash_eq(tmpdir):
    p1a = tmpdir.join('foo')
    p1b = tmpdir.join('foo')
    p2 = tmpdir.join('bar')

    p1a.ensure()
    p2.ensure()

    rd1a = target.RenameableDir(str(p1a))
    rd1b = target.RenameableDir(str(p1b))
    rd2 = target.RenameableDir(str(p2))

    assert rd1a == rd1b
    assert rd1a != rd2


def test_renameable_dir_must_exist(tmpdir):
    """A RenameableDir should reject a path that doesn't exist"""
    with pytest.raises(FileNotFoundError):
        target.RenameableDir(tmpdir.join('a'))


def test_fake_renameable_dir_basename():
    path = target.FakeRenameableDir('foo', None)
    assert path.basename == 'foo'


def test_fake_renameable_dir_basename_setter():
    path = target.FakeRenameableDir('foo', None)
    path.basename = 'bar'
    assert path.basename == 'bar'


def test_fake_renameable_dir_listdir():
    path = target.FakeRenameableDir(None, [
        target.FakeRenameableDir('.git', None),
        target.FakeRenameableDir('git', None),
        target.FakeRenameableDir('..git', None),
        target.FakeRenameableDir('other', None),
    ])
    result = set(x.basename for x in path.listdir(fil=target._dot_git_match))
    assert result == set(['.git', '..git'])


def test_fake_renameable_dir_recursive():
    assert target.FakeRenameableDir(['foo'], []).recursive
    assert not target.FakeRenameableDir(['foo'], None).recursive


def test_fake_renameable_dir_hash_eq():
    variations = [
        target.FakeRenameableDir(None, None),
        target.FakeRenameableDir(None, []),
        target.FakeRenameableDir('foo', []),
        target.FakeRenameableDir(None, [
            target.FakeRenameableDir('foo', None)]
        ),
        target.FakeRenameableDir(None, [
            target.FakeRenameableDir('foo', [
                target.FakeRenameableDir('bar', None)
            ]),
        ]),
    ]
    for a, b in itertools.product(variations, variations):
        if a is b:
            assert a == b
        else:
            assert a != b


def test_fake_renameable_dir_repr():
    rd = target.FakeRenameableDir('foo', [target.FakeRenameableDir('bar', [])])
    assert (
        repr(rd) == "FakeRenameableDir('foo', [FakeRenameableDir('bar', [])])"
    )


@pytest.mark.parametrize('initial,expected', [
    # Empty directory remains unchanged
    (
        target.FakeRenameableDir(None, []),
        target.FakeRenameableDir(None, []),
    ),
    # Basic .git -> ..git escape
    (
        target.FakeRenameableDir(
            None,
            [target.FakeRenameableDir('.git', None)],
        ),
        target.FakeRenameableDir(
            None,
            [target.FakeRenameableDir('..git', None)],
        ),
    ),
    # .git contains a .git
    (
        target.FakeRenameableDir(
            None,
            [
                target.FakeRenameableDir(
                    '.git',
                    [target.FakeRenameableDir('.git', None)],
                )
            ],
        ),
        target.FakeRenameableDir(
            None,
            [
                target.FakeRenameableDir(
                    '..git',
                    [target.FakeRenameableDir('..git', None)],
                )
            ],
        ),
    ),
    # git remains unchanged
    (
        target.FakeRenameableDir(
            None,
            [target.FakeRenameableDir('git', None)],
        ),
        target.FakeRenameableDir(
            None,
            [target.FakeRenameableDir('git', None)],
        ),
    ),
    # .git and ..git both exist
    (
        target.FakeRenameableDir(
            None,
            [
                target.FakeRenameableDir('.git', None),
                target.FakeRenameableDir('..git', None),
            ],
        ),
        target.FakeRenameableDir(
            None,
            [
                target.FakeRenameableDir('..git', None),
                target.FakeRenameableDir('...git', None),
            ],
        ),
    ),
    # Ordinary directory contains a .git
    (
        target.FakeRenameableDir(
            None,
            [
                target.FakeRenameableDir(
                    'foo',
                    [target.FakeRenameableDir('.git', None)],
                )
            ]
        ),
        target.FakeRenameableDir(
            None,
            [
                target.FakeRenameableDir(
                    'foo',
                    [target.FakeRenameableDir('..git', None)],
                )
            ]
        ),
    ),
])
def test_escape_dot_git(initial, expected):
    state = copy.deepcopy(initial)
    # Once escaped, we should get to what was expected
    target._escape_unescape_dot_git(state, target._EscapeDirection.ESCAPE)
    assert state == expected
    # Once unescaped, we should get back to where we started since the escaping
    # mechanism is lossless.
    target._escape_unescape_dot_git(state, target._EscapeDirection.UNESCAPE)
    assert state == initial


def test_unescape_dot_git_raises():
    """Test that unescaping something with '.git' raises an exception."""
    with pytest.raises(RuntimeError):
        target._escape_unescape_dot_git(
            target.FakeRenameableDir(
                None,
                [target.FakeRenameableDir('.git', None)],
            ),
            direction=target._EscapeDirection.UNESCAPE,
        )


@pytest.mark.parametrize('direction', [
    target._EscapeDirection.ESCAPE,
    target._EscapeDirection.UNESCAPE,
])
def test_escape_dot_git_ordering(direction):
    """Test that renames happen in the correct order.

    ...git -> ....git must happen before ..git -> ...git to avoid a collision,
    and vice versa in the unescape case.
    """
    # Avoid '.git' as it isn't valid in the reverse direction
    inner2 = target.FakeRenameableDir('..git', None)
    inner3 = target.FakeRenameableDir('...git', None)
    inputs = [inner2, inner3]
    if direction is target._EscapeDirection.ESCAPE:
        expected_order = [inner3, inner2]
    else:
        expected_order = [inner2, inner3]
    for given_order in [inputs, reversed(inputs)]:
        top = target.FakeRenameableDir(None, given_order)
        target._escape_unescape_dot_git(top, direction)
        assert all(x is y for x, y in zip(top._rename_record, expected_order))


def test_empty_dir_to_tree(pygit2_repo, tmpdir):
    tree_hash = target.GitUbuntuRepository.dir_to_tree(
        pygit2_repo,
        str(tmpdir),
    )
    assert tree_hash == str(Tree({}).write(pygit2_repo))


def test_onefile_dir_to_tree(pygit2_repo, tmpdir):
    tmpdir.join('foo').write('bar')
    tree_hash = target.GitUbuntuRepository.dir_to_tree(
        pygit2_repo,
        str(tmpdir),
    )
    assert tree_hash == str(Tree({'foo': Blob(b'bar')}).write(pygit2_repo))


def test_git_escape_dir_to_tree(pygit2_repo, tmpdir):
    tmpdir.mkdir('.git')
    tree_hash = target.GitUbuntuRepository.dir_to_tree(
        pygit2_repo,
        str(tmpdir),
        escape=True,
    )
    assert tree_hash == str(Tree({'..git': Tree({})}).write(pygit2_repo))


@pytest.mark.parametrize('tree_data,expected_path', [
    # Empty tree -> default
    (Tree({}), 'debian/patches/series'),

    # Empty debian/patches directory -> default
    (Tree({'debian': Tree({'patches': Tree({})})}), 'debian/patches/series'),

    # Only debian/patches/series -> that one
    (
        Tree({'debian': Tree({'patches': Tree({'series': Blob(b'')})})}),
        'debian/patches/series',
    ),

    # Only debian/patches/debian.series -> that one
    (
        Tree({'debian': Tree({'patches': Tree({
            'debian.series': Blob(b'')
        })})}),
        'debian/patches/debian.series',
    ),

    # Both -> debian.series
    (
        Tree({'debian': Tree({'patches': Tree({
            'debian.series': Blob(b''),
            'series': Blob(b''),
        })})}),
        'debian/patches/debian.series',
    ),
])
def test_determine_quilt_series_path(pygit2_repo, tree_data, expected_path):
    tree_obj = pygit2_repo.get(tree_data.write(pygit2_repo))
    path = target.determine_quilt_series_path(pygit2_repo, tree_obj)
    assert path == expected_path


def test_quilt_env(pygit2_repo):
    tree_builder = Tree({'debian':
        Tree({'patches': Tree({'debian.series': Blob(b'')})})
    })
    tree_obj = pygit2_repo.get(tree_builder.write(pygit2_repo))
    env = target.quilt_env(pygit2_repo, tree_obj)
    assert env == {
        'EDITOR': 'true',
        'QUILT_NO_DIFF_INDEX': '1',
        'QUILT_NO_DIFF_TIMESTAMPS': '1',
        'QUILT_PATCHES': 'debian/patches',
        'QUILT_SERIES': 'debian/patches/debian.series',
    }


def test_repo_quilt_env(repo):
    tree_builder = Tree({'debian':
        Tree({'patches': Tree({'debian.series': Blob(b'')})})
    })
    tree_obj = repo.raw_repo.get(tree_builder.write(repo.raw_repo))
    env = repo.quilt_env(tree_obj)
    expected_inside = {
        'EDITOR': 'true',
        'QUILT_NO_DIFF_INDEX': '1',
        'QUILT_NO_DIFF_TIMESTAMPS': '1',
        'QUILT_PATCHES': 'debian/patches',
        'QUILT_SERIES': 'debian/patches/debian.series',
    }
    for k, v in expected_inside.items():
        assert env[k] == v

    # In addition to the settings above, check that
    # GitUbuntuRepository.quilt_env has correctly merged in the usual
    # environment. Testing that a few keys that we expect to be set are set
    # should suffice.
    expected_other_keys = ['HOME', 'GIT_DIR', 'GIT_WORK_TREE']
    for k in expected_other_keys:
        assert env[k]


def test_repo_quilt_env_from_treeish_str(repo):
    tree_builder = Tree({'debian':
        Tree({'patches': Tree({'debian.series': Blob(b'')})})
    })
    tree_obj = repo.raw_repo.get(tree_builder.write(repo.raw_repo))
    env = repo.quilt_env_from_treeish_str(str(tree_obj.id))
    expected_inside = {
        'EDITOR': 'true',
        'QUILT_NO_DIFF_INDEX': '1',
        'QUILT_NO_DIFF_TIMESTAMPS': '1',
        'QUILT_PATCHES': 'debian/patches',
        'QUILT_SERIES': 'debian/patches/debian.series',
    }
    for k, v in expected_inside.items():
        assert env[k] == v


def test_repo_derive_env_change(repo):
    # Changing the dictionary of a GitUbuntuRepository instance env attribute
    # must not have any effect on the env itself. While this may stretch a
    # little further than a normal instance property, it's worth enforcing this
    # as this particular attribute is at particular risk due to how it tends to
    # be used.
    e1 = repo.env
    e1[unittest.mock.sentinel.k] = unittest.mock.sentinel.v
    assert unittest.mock.sentinel.k not in repo.env


@pytest.mark.parametrize(
   'description, input_data, old_ubuntu, new_debian, expected',
   [
        (
            'Common case',
            Repo(
                commits=[
                    Commit.from_spec(
                        name='old/debian'
                    ),
                    Commit.from_spec(
                        parents=[Placeholder('old/debian')],
                        name='old/ubuntu',
                        changelog_versions=['1-1ubuntu1', '1-1'],
                    ),
                    Commit.from_spec(
                        parents=[Placeholder('old/debian')],
                        name='new/debian',
                        changelog_versions=['2-1', '1-1'],
                    ),
                ],
                tags={
                    'pkg/import/1-1': Placeholder('old/debian'),
                    'pkg/import/1-1ubuntu1': Placeholder('old/ubuntu'),
                    'pkg/import/2-1': Placeholder('new/debian'),
                },
            ),
            'pkg/import/1-1ubuntu1',
            'pkg/import/2-1',
            'pkg/import/1-1',
        ),
        (
            'Ubuntu delta based on a NMU',
            Repo(
                commits=[
                    Commit.from_spec(
                        name='fork_point'
                    ),
                    Commit.from_spec(
                        parents=[Placeholder('fork_point')],
                        name='old/debian',
                        changelog_versions=['1-1.1', '1-1'],
                    ),
                    Commit.from_spec(
                        parents=[Placeholder('old/debian')],
                        name='old/ubuntu',
                        changelog_versions=['1-1.1ubuntu1', '1-1.1', '1-1'],
                    ),
                    Commit.from_spec(
                        parents=[Placeholder('fork_point')],
                        name='new/debian',
                        changelog_versions=['2-1', '1-1'],
                    ),
                ],
                tags={
                    'pkg/import/1-1': Placeholder('fork_point'),
                    'pkg/import/1-1.1': Placeholder('old/debian'),
                    'pkg/import/1-1.1ubuntu1': Placeholder('old/ubuntu'),
                    'pkg/import/2-1': Placeholder('new/debian'),
                },
            ),
            'pkg/import/1-1.1ubuntu1',
            'pkg/import/2-1',
            'pkg/import/1-1.1',
        ),
        (
            'Ubuntu upstream version head of Debian',
            Repo(
                commits=[
                    Commit.from_spec(
                        name='old/debian'
                    ),
                    Commit.from_spec(
                        parents=[Placeholder('old/debian')],
                        name='mid_ubuntu',
                        changelog_versions=['1-1ubuntu1', '1-1'],
                    ),
                    Commit.from_spec(
                        parents=[Placeholder('mid_ubuntu')],
                        name='old/ubuntu',
                        changelog_versions=['2-0ubuntu1', '1-1ubuntu1', '1-1'],
                    ),
                    Commit.from_spec(
                        parents=[Placeholder('old/debian')],
                        name='new/debian',
                        changelog_versions=['3-1', '1-1'],
                    ),
                ],
                tags={
                    'pkg/import/1-1': Placeholder('old/debian'),
                    'pkg/import/1-1ubuntu1': Placeholder('mid_ubuntu'),
                    'pkg/import/2-0ubuntu1': Placeholder('old/ubuntu'),
                    'pkg/import/3-1': Placeholder('new/debian'),
                },
            ),
            'pkg/import/2-0ubuntu1',
            'pkg/import/3-1',
            'pkg/import/1-1',
        ),
   ],
)
def test_repo_find_ubuntu_merge(
   description,
   repo,
   input_data,
   old_ubuntu,
   new_debian,
   expected,
):
    input_data.write(repo.raw_repo)
    merge_base = repo.find_ubuntu_merge_base(old_ubuntu)

    assert merge_base

    assert str(
        repo.get_commitish(merge_base).peel(pygit2.Commit).id
    ) == str(
        repo.get_commitish(expected).peel(pygit2.Commit).id
    )


def test_repo_does_cleanup():
    path = tempfile.mkdtemp()
    try:
        repo = target.GitUbuntuRepository(
            path,
            delete_on_close=True,
        )
        repo.close()
        assert not os.path.exists(path)
    finally:
        shutil.rmtree(path, ignore_errors=True)


def test_repo_does_not_cleanup():
    path = tempfile.mkdtemp()
    try:
        repo = target.GitUbuntuRepository(
            path,
            delete_on_close=False,
        )
        repo.close()
        assert os.path.exists(path)
    finally:
        shutil.rmtree(path, ignore_errors=True)


@pytest.mark.parametrize(
    [
        'year',
        'month',
        'day',
        'hours',
        'minutes',
        'seconds',
        'milliseconds',
        'hour_delta',
        'expected',
    ], [
        (1970, 1, 1, 0, 0, 0, 0, 0, (0, 0)),
        (1970, 1, 1, 0, 0, 0, 600, 0, (0, 0)),
        (1970, 1, 1, 1, 0, 0, 0, 1, (0, 60)),
        (1970, 1, 1, 0, 0, 0, 0, -1, (3600, -60)),
        (1971, 2, 3, 4, 5, 6, 7, -8, (34430706, -480)),
    ]
)
def test_datetime_to_signature_spec(
    year,
    month,
    day,
    hours,
    minutes,
    seconds,
    milliseconds,
    hour_delta,
    expected,
):
    input_datetime = datetime.datetime(
        year,
        month,
        day,
        hours,
        minutes,
        seconds,
        milliseconds,
        datetime.timezone(datetime.timedelta(hours=hour_delta)),
    )
    actual = target.datetime_to_signature_spec(input_datetime)
    assert actual == expected


def test_commit_tree(repo):
    # Construct a repository with an initial commit on the master branch so
    # that we can verify later parentage and lack of branch movement
    parent_commit_oid = Repo(
        commits=[Commit(name='master')],
        branches={'master': Placeholder('master')}
    ).write(repo.raw_repo)
    repo.raw_repo.lookup_reference('HEAD').set_target('refs/heads/master')

    # Construct a tree inside the repository with a debian/changelog in it to
    # feed to the method under test
    test_changelog_path = os.path.join(
        pkg_resources.resource_filename('gitubuntu', 'changelog_tests'),
        'test_date_1',
    )
    with open(test_changelog_path, 'rb') as f:
        test_changelog_bytes = f.read()
    test_changelog_blob = Blob(test_changelog_bytes)
    source_tree = Tree({'debian': Tree({'changelog': test_changelog_blob})})
    source_tree_oid = source_tree.write(repo.raw_repo)

    # Call the method under test
    commit_oid = repo.commit_source_tree(
        tree=source_tree_oid,
        parents=[parent_commit_oid],
        log_message='test_commit_msg',
        commit_date=datetime.datetime(
            1971,  # year
            2,  # month
            3,  # day
            4,  # hours
            5,  # minutes
            6,  # seconds
            7,  # milliseconds (this should get truncated down to 0)
            datetime.timezone(datetime.timedelta(hours=-8))
        ),
    )

    # Retrieve the commit object created
    commit = repo.raw_repo[commit_oid]

    # Verify the commit object created is exactly as we expect it

    # Neither HEAD nor the master branch should have changed
    assert repo.raw_repo.lookup_reference('HEAD').target == 'refs/heads/master'
    master_ref = repo.raw_repo.lookup_reference('refs/heads/master')
    assert master_ref.target == parent_commit_oid

    # The tree of the commit should match the tree we requested
    assert commit.tree_id == source_tree_oid

    # The parent commits should match the parents we requested
    assert commit.parent_ids == [parent_commit_oid]

    # The commit message should match what we requested
    assert commit.message == 'test_commit_msg'

    # The commit author should match the changelog entry inside the tree given
    assert commit.author == pygit2.Signature(
        name='Test Maintainer',
        email='test-maintainer@donotmail.com',
        time=1463066074,
        offset=-420,
    )

    # The commit committer should match the date we requested (except the
    # milliseconds component) combined with the specification name and email
    assert commit.committer == pygit2.Signature(
        name=gitubuntu.spec.SYNTHESIZED_COMMITTER_NAME,
        email=gitubuntu.spec.SYNTHESIZED_COMMITTER_EMAIL,
        time=34430706,
        offset=-480,
    )


@pytest.mark.parametrize(['a', 'b', 'expected'],
    [
        ('root', 'root', False),
        ('child1', 'root', True),
        ('root', 'child1', False),
        ('grandchild1', 'root', True),
        ('child1', 'child2', False),
        ('root', 'disjoint', False),
    ]
)
def test_descendant_of(pygit2_repo, a, b, expected):
    """
    General test for pygit2.Repository.descendant_of().

    This test was formerly for a temporary alternative implementation of
    pygit2.Repository.descendant_of() in our GitUbuntuRepository wrapper for
    use until we updated our pinning to use a newer version of pygit2 that
    included the pygit2.Repository.descendant_of() method. After the pinning
    was removed and this implementation replacement took place, we kept the
    test to ensure that actual behaviour did not change.

    This unit tests validate_upload_tag() for various parameterized cases. The
    paramater sets assume the repository structure encoded in the Repo() call
    below.

    :param pygit2.Repository pygit2_repo: fixture providing a temporary
        pygit2.Repository instance to use
    :param str a: tag name of the first commit to pass to descendant_of()
    :param str b: tag name of the second commit to pass to descendant_of()
    :param bool expected: the expected result of descendant_of()
    """
    Repo(
        # Unique message parameters are used in each entry here in order to
        # ensure that otherwise-identical commits do not end up being the same
        # commit (with the same hash, etc).
        commits=[
            Commit(name='root', message='1'),
            Commit(name='child1', parents=[Placeholder('root')], message='2'),
            Commit(name='child2', parents=[Placeholder('root')], message='3'),
            Commit(
                name='grandchild1',
                parents=[Placeholder('child1')],
                message='4'
            ),
            Commit(name='disjoint', message='5'),
        ],
        tags={
            'root': Placeholder('root'),
            'child1': Placeholder('child1'),
            'child2': Placeholder('child2'),
            'grandchild1': Placeholder('grandchild1'),
            'disjoint': Placeholder('disjoint'),
        }
    ).write(pygit2_repo)

    def find_commit(tag_name):
        '''
        Fetch the pygit2.Commit object tagged with the given name

        :param str tag_name: the name of the tag to look up
        :returns: the commit object tagged with the given name
        :rtype: pygit2.Commit
        '''
        tag = pygit2_repo.lookup_reference('refs/tags/%s' % tag_name)
        return tag.peel(pygit2.Commit).id

    assert pygit2_repo.descendant_of(
        find_commit(a),
        find_commit(b),
    ) is expected


def test_create_tag(repo):
    """create_tag() should create a tag

    :param GitUbuntuRepository repo: fixture providing a temporary
        GitUbuntuRepository instance to use
    """
    Repo(
        commits=[Commit(name='root')],
        tags={'root': Placeholder('root')},
    ).write(repo.raw_repo)
    root_ref = repo.raw_repo.lookup_reference('refs/tags/root')
    root_commit = root_ref.peel(pygit2.Commit)
    repo.create_tag(
        commit_hash=str(root_commit.id),
        tag_name='test',
        tag_msg='msg',
        tagger=pygit2.Signature(
            name='Tagger',
            email='tagger@example.com',
            time=1,
            offset=2,
        )
    )
    result_ref = repo.raw_repo.lookup_reference('refs/tags/test')
    result_tag = result_ref.peel(pygit2.Tag)
    assert result_tag.tagger.name == 'Tagger'
    assert result_tag.tagger.email == 'tagger@example.com'
    assert result_tag.tagger.time == 1
    assert result_tag.tagger.offset == 2


def test_get_all_reimport_tags(repo):
    """get_all_reimport_tags() should return only matching reimport tags

    :param GitUbuntuRepository repo: fixture providing a temporary
        GitUbuntuRepository instance to use
    """
    Repo(
        commits=[Commit(name='root')],
        tags={
            'importer/reimport/import/1/0': Placeholder('root'),
            'importer/reimport/import/1/1': Placeholder('root'),

            # This entry must not be identified as one of version 1's reimport
            # tags, even though it shares the same base prefix.
            'importer/reimport/import/11/0': Placeholder('root'),
        },
    ).write(repo.raw_repo)
    tags = repo.get_all_reimport_tags('1', 'importer')
    tag_names = set([tag.name for tag in tags])
    assert tag_names == {
        'refs/tags/importer/reimport/import/1/0',
        'refs/tags/importer/reimport/import/1/1',
    }


def test_get_head_info(repo):
    """get_head_info() extracts expected commit and version info from the head
    commit
    """
    Repo(
        commits=[
            Commit(name='foo', tree=SourceTree(source=Source())),
        ],
        branches={
            'importer/ubuntu/foo': Placeholder('foo'),
            'importer/debian/bar': Placeholder('foo'),
            'other/ubuntu/baz': Placeholder('foo'),
        },
    ).write(repo.raw_repo)
    foo_commit_id = (
        repo.raw_repo
        .lookup_reference('refs/heads/importer/ubuntu/foo')
        .peel(pygit2.Commit).id
    )
    assert repo.get_head_info('ubuntu', 'importer') == {
        'importer/ubuntu/foo': HeadInfoItem(
            version='1-1',
            commit_time=0,
            commit_id=foo_commit_id,
        ),
    }
