File: vcs.py

package info (click to toggle)
cookiecutter 1.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 1,328 kB
  • sloc: python: 4,126; makefile: 242; sh: 4
file content (144 lines) | stat: -rw-r--r-- 4,409 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# -*- coding: utf-8 -*-

"""Helper functions for working with version control systems."""

from __future__ import unicode_literals
import logging
import os
import subprocess
import sys

from whichcraft import which

from .exceptions import (
    RepositoryNotFound, RepositoryCloneFailed, UnknownRepoType, VCSNotInstalled
)
from .prompt import read_user_yes_no
from .utils import make_sure_path_exists, rmtree

logger = logging.getLogger(__name__)


BRANCH_ERRORS = [
    'error: pathspec',
    'unknown revision',
]


def prompt_and_delete_repo(repo_dir, no_input=False):
    """Ask the user whether it's okay to delete the previously-cloned repo.

    If yes, deletes it. Otherwise, Cookiecutter exits.

    :param repo_dir: Directory of previously-cloned repo.
    :param no_input: Suppress prompt to delete repo and just delete it.
    """
    # Suppress prompt if called via API
    if no_input:
        ok_to_delete = True
    else:
        question = (
            "You've cloned {} before. "
            "Is it okay to delete and re-clone it?"
        ).format(repo_dir)

        ok_to_delete = read_user_yes_no(question, 'yes')

    if ok_to_delete:
        rmtree(repo_dir)
    else:
        sys.exit()


def identify_repo(repo_url):
    """Determine if `repo_url` should be treated as a URL to a git or hg repo.

    Repos can be identified by prepending "hg+" or "git+" to the repo URL.

    :param repo_url: Repo URL of unknown type.
    :returns: ('git', repo_url), ('hg', repo_url), or None.
    """
    repo_url_values = repo_url.split('+')
    if len(repo_url_values) == 2:
        repo_type = repo_url_values[0]
        if repo_type in ["git", "hg"]:
            return repo_type, repo_url_values[1]
        else:
            raise UnknownRepoType
    else:
        if 'git' in repo_url:
            return 'git', repo_url
        elif 'bitbucket' in repo_url:
            return 'hg', repo_url
        else:
            raise UnknownRepoType


def is_vcs_installed(repo_type):
    """
    Check if the version control system for a repo type is installed.

    :param repo_type:
    """
    return bool(which(repo_type))


def clone(repo_url, checkout=None, clone_to_dir='.', no_input=False):
    """Clone a repo to the current directory.

    :param repo_url: Repo URL of unknown type.
    :param checkout: The branch, tag or commit ID to checkout after clone.
    :param clone_to_dir: The directory to clone to.
                         Defaults to the current directory.
    :param no_input: Suppress all user prompts when calling via API.
    """
    # Ensure that clone_to_dir exists
    clone_to_dir = os.path.expanduser(clone_to_dir)
    make_sure_path_exists(clone_to_dir)

    # identify the repo_type
    repo_type, repo_url = identify_repo(repo_url)

    # check that the appropriate VCS for the repo_type is installed
    if not is_vcs_installed(repo_type):
        msg = "'{0}' is not installed.".format(repo_type)
        raise VCSNotInstalled(msg)

    repo_url = repo_url.rstrip('/')
    tail = os.path.split(repo_url)[1]
    if repo_type == 'git':
        repo_dir = os.path.normpath(os.path.join(clone_to_dir,
                                                 tail.rsplit('.git')[0]))
    elif repo_type == 'hg':
        repo_dir = os.path.normpath(os.path.join(clone_to_dir, tail))
    logger.debug('repo_dir is {0}'.format(repo_dir))

    if os.path.isdir(repo_dir):
        prompt_and_delete_repo(repo_dir, no_input=no_input)

    try:
        subprocess.check_output(
            [repo_type, 'clone', repo_url],
            cwd=clone_to_dir,
            stderr=subprocess.STDOUT,
        )
        if checkout is not None:
            subprocess.check_output(
                [repo_type, 'checkout', checkout],
                cwd=repo_dir,
                stderr=subprocess.STDOUT,
            )
    except subprocess.CalledProcessError as clone_error:
        if 'not found' in clone_error.output.lower():
            raise RepositoryNotFound(
                'The repository {} could not be found, '
                'have you made a typo?'.format(repo_url)
            )
        if any(error in clone_error.output for error in BRANCH_ERRORS):
            raise RepositoryCloneFailed(
                'The {} branch of repository {} could not found, '
                'have you made a typo?'.format(checkout, repo_url)
            )
        raise

    return repo_dir