1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
|
# Copyright (C) 2020 Sebastian Pipping <sebastian@pipping.org>
# Licensed under GPL v3 or later
import os
import subprocess
from collections import OrderedDict
from typing import Optional
from ._metadata import APP
class GitException(Exception):
pass
class CheckoutFailed(GitException):
pass
class PullFailed(GitException):
pass
class MergeBaseFailed(GitException):
pass
class Git:
_GIT = "git"
_GIT_ENCODING = "utf-8"
_APP_EMAIL = f"{APP}@localhost"
_ARBITRARY_FIXED_DATETIME = "2005-12-21T00:00:00+00:00" # release date of Git 1.0.0
_COMMIT_ENVIRON = {
"GIT_AUTHOR_DATE": _ARBITRARY_FIXED_DATETIME,
"GIT_AUTHOR_EMAIL": _APP_EMAIL,
"GIT_AUTHOR_NAME": APP,
"GIT_COMMITTER_DATE": _ARBITRARY_FIXED_DATETIME,
"GIT_COMMITTER_EMAIL": _APP_EMAIL,
"GIT_COMMITTER_NAME": APP,
}
def __init__(self, messenger, pretend, verbose, work_dir=None):
self._messenger = messenger
self._verbose = verbose
self._pretend = pretend
self._working_directory = work_dir
@property
def pretend(self):
return self._pretend
def _wrap_subprocess(self, subprocess_function, argv, is_write, pretend_result, env):
pretend = is_write and self._pretend
if self._verbose:
comment = "skipped due to --dry-run" if pretend else ""
display_argv = [a for a in argv if not a.startswith("--format=")]
self._messenger.tell_command(display_argv, comment)
if pretend:
return pretend_result
return subprocess_function(argv, cwd=self._working_directory, env=env)
def _subprocess_check_output(self, argv, is_write, env=None):
return self._wrap_subprocess(
subprocess.check_output, argv=argv, is_write=is_write, pretend_result=b"", env=env
)
def _subprocess_check_call(self, argv, is_write, env=None):
return self._wrap_subprocess(
subprocess.check_call, argv=argv, is_write=is_write, pretend_result=0, env=env
)
@classmethod
def _output_bytes_to_lines(cls, output_bytes) -> list[str]:
text = output_bytes.decode(cls._GIT_ENCODING).rstrip()
if not text: # protect against this: ''.split('\n') -> ['']
return []
return text.split("\n")
def extract_git_config(self):
argv = [self._GIT, "config", "--list", "--null"]
output_bytes = self._subprocess_check_output(argv, is_write=False)
key_newline_value_list = [
chunk.decode(self._GIT_ENCODING) for chunk in output_bytes.split(b"\0")
]
git_config = OrderedDict()
for key_newline_value in key_newline_value_list:
if not key_newline_value:
continue
try:
key, value = key_newline_value.split("\n", 1)
except ValueError:
self._messenger.tell_info(
f"Git config option {key_newline_value!r} lacks assignment of a value."
)
continue
git_config[key] = value
return git_config
def find_remotes(self):
argv = [self._GIT, "remote"]
output_bytes = self._subprocess_check_output(argv, is_write=False)
return self._output_bytes_to_lines(output_bytes)
def _find_branches(self, extra_argv=None, strip_left: int = 2) -> list[str]:
# strip_left==1 strips leading "refs/"
# strip_left==2 strips leading "refs/heads/" and "refs/remotes/"
argv = [
self._GIT,
"-c",
"column.branch=plain",
"branch",
f"--format=%(refname:lstrip={strip_left})",
]
if extra_argv is not None:
argv += extra_argv
output_bytes = self._subprocess_check_output(argv, is_write=False)
lines = self._output_bytes_to_lines(output_bytes)
return [
line for line in lines if not line.endswith("/HEAD") and "HEAD detached at" not in line
]
def find_local_branches(self) -> list[str]:
return self._find_branches()
def find_all_branch_refs(self) -> list[str]:
return self._find_branches(["--all"])
def find_all_branch_names(self) -> set[str]:
branch_names = set()
for line in self._find_branches(["--all"], strip_left=1):
heads_or_remotes, *remainder = line.split("/")
if heads_or_remotes == "heads":
branch_name = "/".join(remainder)
elif heads_or_remotes == "remotes":
branch_name = "/".join(remainder[1:])
else:
raise ValueError(f"Reference {line!r} not understood")
branch_names.add(branch_name)
return branch_names
def find_remote_branches_at(self, remote_name) -> list[str]:
assert remote_name
extra_argv = ["--remote", "--list", f"{remote_name}/*"]
return self._find_branches(extra_argv)
def find_current_branch(self) -> Optional[str]:
# Note: Avoiding "git branch --show-current" of Git >=2.22.0
# to keep Git 2.17.1 of Ubuntu 18.04 in the boat, for now.
argv = [self._GIT, "rev-parse", "--symbolic-full-name", "HEAD"]
output_bytes = self._subprocess_check_output(argv, is_write=False)
lines = self._output_bytes_to_lines(output_bytes)
assert len(lines) == 1
expected_prefix = "refs/heads/"
reference = lines[0] # 'HEAD' when detached, else 'refs/heads/<branch>'
if not reference.startswith(expected_prefix):
return None # detached head
return reference[len(expected_prefix) :]
def find_working_tree_branches(self) -> list[Optional[str]]:
argv = [self._GIT, "worktree", "list", "--porcelain"] # requires Git >=2.7.0
output_bytes = self._subprocess_check_output(argv, is_write=False)
lines = self._output_bytes_to_lines(output_bytes)
detached_line_prefix = "detached"
branch_line_prefix = "branch "
branch_prefix = "refs/heads/"
branch_names: list[Optional[str]] = []
for line in lines:
if line.startswith(detached_line_prefix):
branch_names.append(None)
elif line.startswith(branch_line_prefix):
branch_name = line[len(branch_line_prefix) :]
if branch_name.startswith(branch_prefix):
branch_name = branch_name[len(branch_prefix) :]
branch_names.append(branch_name)
return branch_names
def has_detached_heads(self) -> bool:
return None in self.find_working_tree_branches()
def _get_merged_branches_for(self, target_branch: str, remote: bool):
extra_argv = []
if remote:
extra_argv.append("--remote")
extra_argv += [
"--merged",
target_branch,
]
merged_branches = self._find_branches(extra_argv)
return (branch for branch in merged_branches if branch != target_branch)
def find_merged_local_branches_for(self, branch_name):
return self._get_merged_branches_for(branch_name, remote=False)
def find_merged_remote_branches_for(self, remote_name, branch_name):
return self._get_merged_branches_for(f"{remote_name}/{branch_name}", remote=True)
def delete_local_branches(self, branch_names, force=False):
if not branch_names:
return
argv = [self._GIT, "branch", "--delete"]
if force:
argv.append("--force")
argv += sorted(branch_names)
self._subprocess_check_call(argv, is_write=True)
def delete_remote_branches(self, branch_names, remote_name):
if not branch_names:
return
remote_prefix = f"{remote_name}/"
remote_branches_to_delete = [
"refs/heads/" + remote_slash_branch[len(remote_prefix) :]
for remote_slash_branch in branch_names
if remote_slash_branch.startswith(remote_prefix)
]
if not remote_branches_to_delete:
return
argv = [
self._GIT,
"push",
"--delete",
"--force-with-lease",
remote_name,
] + remote_branches_to_delete
self._subprocess_check_output(argv, is_write=True)
def set_config(self, key, value):
argv = [self._GIT, "config"]
if value is None:
argv += ["--unset", key]
else:
argv += [key, value]
self._subprocess_check_output(argv, is_write=True)
def update_and_prune_remote(self, remote_name: str) -> None:
argv = [self._GIT, "remote", "update", "--prune", remote_name]
self._subprocess_check_output(argv, is_write=True)
def checkout(self, committish: str) -> None:
argv = [self._GIT, "checkout", "-q"]
argv.append(committish)
try:
self._subprocess_check_output(argv, is_write=True)
except subprocess.CalledProcessError as e:
if e.returncode == 1:
raise CheckoutFailed
raise
def pull_ff_only(self) -> None:
argv = [self._GIT, "pull", "--ff-only"]
try:
self._subprocess_check_output(argv, is_write=True)
except subprocess.CalledProcessError as e:
if e.returncode == 1:
raise PullFailed
raise
def _has_changes(self, extra_argv: Optional[list[str]] = None) -> bool:
argv = [self._GIT, "diff", "--exit-code", "--quiet"]
if extra_argv:
argv += extra_argv
try:
self._subprocess_check_output(argv, is_write=False)
return False
except subprocess.CalledProcessError as e:
if e.returncode == 1:
return True
raise
def has_staged_changes(self) -> bool:
return self._has_changes(["--cached"])
def has_uncommitted_changes(self) -> bool:
if self._has_changes():
return True
return self.has_staged_changes()
def cherry(self, target_branch, topic_branch) -> list[str]:
argv = [self._GIT, "cherry", target_branch, topic_branch]
output_bytes = self._subprocess_check_output(argv, is_write=False)
return self._output_bytes_to_lines(output_bytes)
def has_unpushed_commits_on(self, branch, with_regard_to):
cherry_lines = self.cherry(with_regard_to, branch)
return any(line.startswith("+") for line in cherry_lines)
def commit_tree(self, message: str, parent_committish: str, tree: str) -> str:
argv = [self._GIT, "commit-tree", "-m", message, "-p", parent_committish, tree]
env = os.environ.copy()
env.update(self._COMMIT_ENVIRON)
# Note: Command "git commit-tree" does write to the repository but it does
# not switch branches, move HEAD or delete things; that's why it
# it is considered "not writing" (``is_write=False``) here and
# will be performed even when ``--dry-run``/``self._pretend`` is active.
output_bytes = self._subprocess_check_output(argv, env=env, is_write=False)
lines = self._output_bytes_to_lines(output_bytes)
assert len(lines) == 1
return lines[0]
def merge_base(self, target_branch, topic_branch) -> str:
argv = [self._GIT, "merge-base", target_branch, topic_branch]
try:
output_bytes = self._subprocess_check_output(argv, is_write=False)
except subprocess.CalledProcessError as e:
if e.returncode == 1:
raise MergeBaseFailed
raise
lines = self._output_bytes_to_lines(output_bytes)
assert len(lines) == 1
return lines[0]
|