1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
|
from __future__ import annotations
from typing import Any, List, Optional, Sequence, Tuple
from subprocess import CompletedProcess, run, CalledProcessError
from pathlib import Path
import textwrap
import sys
import os
import re
from .odb import Repository, Commit, Tree, Oid, Reference
class EditorError(Exception):
pass
def commit_range(base: Optional[Commit], tip: Commit) -> List[Commit]:
"""Oldest-first iterator over the given commit range,
not including the commit ``base``"""
commits = []
while tip != base:
commits.append(tip)
if tip.is_root and base is None:
break
tip = tip.parent()
commits.reverse()
return commits
def local_commits(repo: Repository, tip: Commit) -> Tuple[Commit, List[Commit]]:
"""Returns an oldest-first iterator over the local commits which are
parents of the specified commit. May return an empty list. A commit is
considered local if it is not present on any remote."""
# Keep track of the current base commit we're expecting. This serves two
# purposes. Firstly, it lets us return a base commit to our caller, and
# secondly it allows us to ensure the commits ``git log`` is producing form
# a single-parent chain from our initial commit.
base = tip
# Call `git log` to log out the OIDs of the commits in our specified range.
log = repo.git("log", base.oid.hex(), "--not", "--remotes", "--pretty=%H")
# Build a list of commits, validating each commit is part of a single-parent chain.
commits = []
for line in log.splitlines():
commit = repo.get_commit(Oid.fromhex(line.decode()))
# Ensure the commit we got is the parent of the previous logged commit.
if len(commit.parents()) != 1 or commit != base:
break
base = commit.parent()
# Add the commit to our list.
commits.append(commit)
# Reverse our list into oldest-first order.
commits.reverse()
return base, commits
def edit_file_with_editor(editor: str, path: Path) -> bytes:
try:
cmd = [sh_path(), "-ec", f'{editor} "$@"', editor, path.name]
run(cmd, check=True, cwd=path.parent)
except CalledProcessError as err:
raise EditorError(f"Editor exited with status {err}") from err
return path.read_bytes()
def get_commentchar(repo: Repository, text: bytes) -> bytes:
commentchar = repo.config("core.commentChar", default=b"#")
if commentchar == b"auto":
chars = bytearray(b"#;@!$%^&|:")
for line in text.splitlines():
try:
chars.remove(line[0])
except (ValueError, IndexError):
pass
try:
return chars[:1]
except IndexError as err:
raise EditorError(
"Unable to automatically select a comment character"
) from err
if commentchar == b"":
raise EditorError("core.commentChar must not be empty")
return commentchar
def strip_comments(
data: bytes, commentchar: bytes, allow_preceding_whitespace: bool
) -> bytes:
if allow_preceding_whitespace:
pat_is_comment_line = re.compile(br"^\s*" + re.escape(commentchar))
def is_comment_line(line: bytes) -> bool:
return bool(re.match(pat_is_comment_line, line))
else:
def is_comment_line(line: bytes) -> bool:
return line.startswith(commentchar)
lines = b""
for line in data.splitlines(keepends=True):
if not is_comment_line(line):
lines += line
lines = lines.rstrip()
if lines != b"":
lines += b"\n"
return lines
def run_specific_editor(
editor: str,
repo: Repository,
filename: str,
text: bytes,
comments: Optional[str] = None,
allow_empty: bool = False,
allow_whitespace_before_comments: bool = False,
) -> bytes:
"""Run the editor configured for git to edit the given text"""
path = repo.get_tempdir() / filename
commentchar = get_commentchar(repo, text)
with open(path, "wb") as handle:
for line in text.splitlines():
handle.write(line + b"\n")
if comments: # If comments were provided, write them after the text.
handle.write(b"\n")
for comment in textwrap.dedent(comments).splitlines():
handle.write(commentchar)
if comment:
handle.write(b" " + comment.encode("utf-8"))
handle.write(b"\n")
# Invoke the editor
data = edit_file_with_editor(editor, path)
if comments:
data = strip_comments(
data,
commentchar,
allow_preceding_whitespace=allow_whitespace_before_comments,
)
# Produce an error if the file was empty
if not (allow_empty or data):
raise EditorError("empty file - aborting")
return data
def git_editor(repo: Repository) -> str:
return repo.git("var", "GIT_EDITOR").decode()
def edit_file(repo: Repository, path: Path) -> bytes:
return edit_file_with_editor(git_editor(repo), path)
def run_editor(
repo: Repository,
filename: str,
text: bytes,
comments: Optional[str] = None,
allow_empty: bool = False,
) -> bytes:
"""Run the editor configured for git to edit the given text"""
return run_specific_editor(
editor=git_editor(repo),
repo=repo,
filename=filename,
text=text,
comments=comments,
allow_empty=allow_empty,
)
def git_sequence_editor(repo: Repository) -> str:
# This lookup order replicates the one used by git itself.
# See editor.c:sequence_editor.
editor = os.getenv("GIT_SEQUENCE_EDITOR")
if editor is None:
editor_bytes = repo.config("sequence.editor", default=None)
editor = editor_bytes.decode() if editor_bytes is not None else None
if editor is None:
editor = git_editor(repo)
return editor
def run_sequence_editor(
repo: Repository,
filename: str,
text: bytes,
comments: Optional[str] = None,
allow_empty: bool = False,
) -> bytes:
"""Run the editor configured for git to edit the given rebase/revise sequence"""
return run_specific_editor(
editor=git_sequence_editor(repo),
repo=repo,
filename=filename,
text=text,
comments=comments,
allow_empty=allow_empty,
allow_whitespace_before_comments=True,
)
def edit_commit_message(commit: Commit) -> Commit:
"""Launch an editor to edit the commit message of ``commit``, returning
a modified commit"""
repo = commit.repo
comments = (
"Please enter the commit message for your changes. Lines starting\n"
"with '#' will be ignored, and an empty message aborts the commit.\n"
)
# If the target commit is not a merge commit, produce a diff --stat to
# include in the commit message comments.
if len(commit.parents()) < 2:
tree_a = commit.parent_tree().persist().hex()
tree_b = commit.tree().persist().hex()
comments += "\n" + repo.git("diff-tree", "--stat", tree_a, tree_b).decode()
message = run_editor(repo, "COMMIT_EDITMSG", commit.message, comments=comments)
return commit.update(message=message)
def update_head(ref: Reference[Commit], new: Commit, expected: Optional[Tree]) -> None:
# Update the HEAD commit to point to the new value.
target_oid = ref.target.oid if ref.target else Oid.null()
print(f"Updating {ref.name} ({target_oid} => {new.oid})")
ref.update(new, "git-revise rewrite")
# We expect our tree to match the tree we started with (including index
# changes). If it does not, print out a warning.
if expected and new.tree() != expected:
print(
"(warning) unexpected final tree\n"
f"(note) expected: {expected.oid}\n"
f"(note) actual: {new.tree().oid}\n"
"(note) working directory & index have not been updated.\n"
"(note) use `git status` to see what has changed.",
file=sys.stderr,
)
def cut_commit(commit: Commit) -> Commit:
"""Perform a ``cut`` operation on the given commit, and return the
modified commit."""
print(f"Cutting commit {commit.oid.short()}")
print("Select changes to be included in part [1]:")
base_tree = commit.parent_tree()
final_tree = commit.tree()
# Create an environment with an explicit index file and the base tree.
#
# NOTE: The use of `skip_worktree` is only necessary due to `git reset
# --patch` unnecessarily invoking `git update-cache --refresh`. Doing the
# extra work to set the bit greatly improves the speed of the unnecessary
# refresh operation.
index = base_tree.to_index(
commit.repo.get_tempdir() / "TEMP_INDEX", skip_worktree=True
)
# Run an interactive git-reset to allow picking which pieces of the
# patch should go into the first part.
index.git("reset", "--patch", final_tree.persist().hex(), "--", ".", nocapture=True)
# Write out the newly created tree.
mid_tree = index.tree()
# Check if one or the other of the commits will be empty
if mid_tree == base_tree:
raise ValueError("cut part [1] is empty - aborting")
if mid_tree == final_tree:
raise ValueError("cut part [2] is empty - aborting")
# Build the first commit
part1 = commit.update(tree=mid_tree, message=b"[1] " + commit.message)
part1 = edit_commit_message(part1)
# Build the second commit
part2 = commit.update(parents=[part1], message=b"[2] " + commit.message)
part2 = edit_commit_message(part2)
return part2
def sh_path() -> str:
if os.name == "nt":
# On Windows, git is installed using Git for Windows, which installs
# into the "Git" directory in "%ProgramFiles%". Use the `sh.exe` file
# from that directory to perform shell operations, so they're executed
# in the expected environment.
return os.path.join(os.environ["PROGRAMFILES"], "Git", "bin", "sh.exe")
return "/bin/sh"
def sh_run(
cmd: Sequence[Any],
*args: Any,
**kwargs: Any,
) -> CompletedProcess[Any]:
"""Run a command within git's shell environment. This is the same as
subprocess.run on most platforms, but will enter the git-bash mingw
environment on Windows."""
if os.name == "nt":
cmd = (sh_path(), "-ec", 'exec "$0" "$@"', *cmd)
return run(cmd, *args, **kwargs) # pylint: disable=subprocess-run-check
|