1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
|
from __future__ import annotations
import concurrent.futures
import os.path
import re
import tempfile
from collections.abc import Sequence
from typing import Any
from typing import NamedTuple
import pre_commit.constants as C
from pre_commit import git
from pre_commit import output
from pre_commit import xargs
from pre_commit.clientlib import InvalidManifestError
from pre_commit.clientlib import load_config
from pre_commit.clientlib import load_manifest
from pre_commit.clientlib import LOCAL
from pre_commit.clientlib import META
from pre_commit.commands.migrate_config import migrate_config
from pre_commit.util import CalledProcessError
from pre_commit.util import cmd_output
from pre_commit.util import cmd_output_b
from pre_commit.yaml import yaml_dump
from pre_commit.yaml import yaml_load
class RevInfo(NamedTuple):
repo: str
rev: str
frozen: str | None = None
hook_ids: frozenset[str] = frozenset()
@classmethod
def from_config(cls, config: dict[str, Any]) -> RevInfo:
return cls(config['repo'], config['rev'])
def update(self, tags_only: bool, freeze: bool) -> RevInfo:
with tempfile.TemporaryDirectory() as tmp:
_git = ('git', *git.NO_FS_MONITOR, '-C', tmp)
if tags_only:
tag_opt = '--abbrev=0'
else:
tag_opt = '--exact'
tag_cmd = (*_git, 'describe', 'FETCH_HEAD', '--tags', tag_opt)
git.init_repo(tmp, self.repo)
cmd_output_b(*_git, 'config', 'extensions.partialClone', 'true')
cmd_output_b(
*_git, 'fetch', 'origin', 'HEAD',
'--quiet', '--filter=blob:none', '--tags',
)
try:
rev = cmd_output(*tag_cmd)[1].strip()
except CalledProcessError:
rev = cmd_output(*_git, 'rev-parse', 'FETCH_HEAD')[1].strip()
else:
if tags_only:
rev = git.get_best_candidate_tag(rev, tmp)
frozen = None
if freeze:
exact = cmd_output(*_git, 'rev-parse', rev)[1].strip()
if exact != rev:
rev, frozen = exact, rev
try:
# workaround for windows -- see #2865
cmd_output_b(*_git, 'show', f'{rev}:{C.MANIFEST_FILE}')
cmd_output(*_git, 'checkout', rev, '--', C.MANIFEST_FILE)
except CalledProcessError:
pass # this will be caught by manifest validating code
try:
manifest = load_manifest(os.path.join(tmp, C.MANIFEST_FILE))
except InvalidManifestError as e:
raise RepositoryCannotBeUpdatedError(f'[{self.repo}] {e}')
else:
hook_ids = frozenset(hook['id'] for hook in manifest)
return self._replace(rev=rev, frozen=frozen, hook_ids=hook_ids)
class RepositoryCannotBeUpdatedError(RuntimeError):
pass
def _check_hooks_still_exist_at_rev(
repo_config: dict[str, Any],
info: RevInfo,
) -> None:
# See if any of our hooks were deleted with the new commits
hooks = {hook['id'] for hook in repo_config['hooks']}
hooks_missing = hooks - info.hook_ids
if hooks_missing:
raise RepositoryCannotBeUpdatedError(
f'[{info.repo}] Cannot update because the update target is '
f'missing these hooks: {", ".join(sorted(hooks_missing))}',
)
def _update_one(
i: int,
repo: dict[str, Any],
*,
tags_only: bool,
freeze: bool,
) -> tuple[int, RevInfo, RevInfo]:
old = RevInfo.from_config(repo)
new = old.update(tags_only=tags_only, freeze=freeze)
_check_hooks_still_exist_at_rev(repo, new)
return i, old, new
REV_LINE_RE = re.compile(r'^(\s+)rev:(\s*)([\'"]?)([^\s#]+)(.*)(\r?\n)$')
def _original_lines(
path: str,
rev_infos: list[RevInfo | None],
retry: bool = False,
) -> tuple[list[str], list[int]]:
"""detect `rev:` lines or reformat the file"""
with open(path, newline='') as f:
original = f.read()
lines = original.splitlines(True)
idxs = [i for i, line in enumerate(lines) if REV_LINE_RE.match(line)]
if len(idxs) == len(rev_infos):
return lines, idxs
elif retry:
raise AssertionError('could not find rev lines')
else:
with open(path, 'w') as f:
f.write(yaml_dump(yaml_load(original)))
return _original_lines(path, rev_infos, retry=True)
def _write_new_config(path: str, rev_infos: list[RevInfo | None]) -> None:
lines, idxs = _original_lines(path, rev_infos)
for idx, rev_info in zip(idxs, rev_infos):
if rev_info is None:
continue
match = REV_LINE_RE.match(lines[idx])
assert match is not None
new_rev_s = yaml_dump({'rev': rev_info.rev}, default_style=match[3])
new_rev = new_rev_s.split(':', 1)[1].strip()
if rev_info.frozen is not None:
comment = f' # frozen: {rev_info.frozen}'
elif match[5].strip().startswith('# frozen:'):
comment = ''
else:
comment = match[5]
lines[idx] = f'{match[1]}rev:{match[2]}{new_rev}{comment}{match[6]}'
with open(path, 'w', newline='') as f:
f.write(''.join(lines))
def autoupdate(
config_file: str,
tags_only: bool,
freeze: bool,
repos: Sequence[str] = (),
jobs: int = 1,
) -> int:
"""Auto-update the pre-commit config to the latest versions of repos."""
migrate_config(config_file, quiet=True)
changed = False
retv = 0
config_repos = [
repo for repo in load_config(config_file)['repos']
if repo['repo'] not in {LOCAL, META}
]
rev_infos: list[RevInfo | None] = [None] * len(config_repos)
jobs = jobs or xargs.cpu_count() # 0 => number of cpus
jobs = min(jobs, len(repos) or len(config_repos)) # max 1-per-thread
jobs = max(jobs, 1) # at least one thread
with concurrent.futures.ThreadPoolExecutor(jobs) as exe:
futures = [
exe.submit(
_update_one,
i, repo, tags_only=tags_only, freeze=freeze,
)
for i, repo in enumerate(config_repos)
if not repos or repo['repo'] in repos
]
for future in concurrent.futures.as_completed(futures):
try:
i, old, new = future.result()
except RepositoryCannotBeUpdatedError as e:
output.write_line(str(e))
retv = 1
else:
if new.rev != old.rev:
changed = True
if new.frozen:
new_s = f'{new.frozen} (frozen)'
else:
new_s = new.rev
msg = f'updating {old.rev} -> {new_s}'
rev_infos[i] = new
else:
msg = 'already up to date!'
output.write_line(f'[{old.repo}] {msg}')
if changed:
_write_new_config(config_file, rev_infos)
return retv
|