1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
|
from __future__ import annotations
import json
import logging
import os
from collections.abc import Sequence
from typing import Any
import pre_commit.constants as C
from pre_commit.all_languages import languages
from pre_commit.clientlib import load_manifest
from pre_commit.clientlib import LOCAL
from pre_commit.clientlib import META
from pre_commit.hook import Hook
from pre_commit.lang_base import environment_dir
from pre_commit.prefix import Prefix
from pre_commit.store import Store
from pre_commit.util import clean_path_on_failure
from pre_commit.util import rmtree
logger = logging.getLogger('pre_commit')
def _state_filename_v1(venv: str) -> str:
return os.path.join(venv, '.install_state_v1')
def _state_filename_v2(venv: str) -> str:
return os.path.join(venv, '.install_state_v2')
def _state(additional_deps: Sequence[str]) -> object:
return {'additional_dependencies': additional_deps}
def _read_state(venv: str) -> object | None:
filename = _state_filename_v1(venv)
if not os.path.exists(filename):
return None
else:
with open(filename) as f:
return json.load(f)
def _hook_installed(hook: Hook) -> bool:
lang = languages[hook.language]
if lang.ENVIRONMENT_DIR is None:
return True
venv = environment_dir(
hook.prefix,
lang.ENVIRONMENT_DIR,
hook.language_version,
)
return (
(
os.path.exists(_state_filename_v2(venv)) or
_read_state(venv) == _state(hook.additional_dependencies)
) and
not lang.health_check(hook.prefix, hook.language_version)
)
def _hook_install(hook: Hook) -> None:
logger.info(f'Installing environment for {hook.src}.')
logger.info('Once installed this environment will be reused.')
logger.info('This may take a few minutes...')
lang = languages[hook.language]
assert lang.ENVIRONMENT_DIR is not None
venv = environment_dir(
hook.prefix,
lang.ENVIRONMENT_DIR,
hook.language_version,
)
# There's potentially incomplete cleanup from previous runs
# Clean it up!
if os.path.exists(venv):
rmtree(venv)
with clean_path_on_failure(venv):
lang.install_environment(
hook.prefix, hook.language_version, hook.additional_dependencies,
)
health_error = lang.health_check(hook.prefix, hook.language_version)
if health_error:
raise AssertionError(
f'BUG: expected environment for {hook.language} to be healthy '
f'immediately after install, please open an issue describing '
f'your environment\n\n'
f'more info:\n\n{health_error}',
)
# TODO: remove v1 state writing, no longer needed after pre-commit 3.0
# Write our state to indicate we're installed
state_filename = _state_filename_v1(venv)
staging = f'{state_filename}staging'
with open(staging, 'w') as state_file:
state_file.write(json.dumps(_state(hook.additional_dependencies)))
# Move the file into place atomically to indicate we've installed
os.replace(staging, state_filename)
open(_state_filename_v2(venv), 'a+').close()
def _hook(
*hook_dicts: dict[str, Any],
root_config: dict[str, Any],
) -> dict[str, Any]:
ret, rest = dict(hook_dicts[0]), hook_dicts[1:]
for dct in rest:
ret.update(dct)
lang = ret['language']
if ret['language_version'] == C.DEFAULT:
ret['language_version'] = root_config['default_language_version'][lang]
if ret['language_version'] == C.DEFAULT:
ret['language_version'] = languages[lang].get_default_version()
if not ret['stages']:
ret['stages'] = root_config['default_stages']
if languages[lang].ENVIRONMENT_DIR is None:
if ret['language_version'] != C.DEFAULT:
logger.error(
f'The hook `{ret["id"]}` specifies `language_version` but is '
f'using language `{lang}` which does not install an '
f'environment. '
f'Perhaps you meant to use a specific language?',
)
exit(1)
if ret['additional_dependencies']:
logger.error(
f'The hook `{ret["id"]}` specifies `additional_dependencies` '
f'but is using language `{lang}` which does not install an '
f'environment. '
f'Perhaps you meant to use a specific language?',
)
exit(1)
return ret
def _non_cloned_repository_hooks(
repo_config: dict[str, Any],
store: Store,
root_config: dict[str, Any],
) -> tuple[Hook, ...]:
def _prefix(language_name: str, deps: Sequence[str]) -> Prefix:
language = languages[language_name]
# pygrep / script / system / docker_image do not have
# environments so they work out of the current directory
if language.ENVIRONMENT_DIR is None:
return Prefix(os.getcwd())
else:
return Prefix(store.make_local(deps))
return tuple(
Hook.create(
repo_config['repo'],
_prefix(hook['language'], hook['additional_dependencies']),
_hook(hook, root_config=root_config),
)
for hook in repo_config['hooks']
)
def _cloned_repository_hooks(
repo_config: dict[str, Any],
store: Store,
root_config: dict[str, Any],
) -> tuple[Hook, ...]:
repo, rev = repo_config['repo'], repo_config['rev']
manifest_path = os.path.join(store.clone(repo, rev), C.MANIFEST_FILE)
by_id = {hook['id']: hook for hook in load_manifest(manifest_path)}
for hook in repo_config['hooks']:
if hook['id'] not in by_id:
logger.error(
f'`{hook["id"]}` is not present in repository {repo}. '
f'Typo? Perhaps it is introduced in a newer version? '
f'Often `pre-commit autoupdate` fixes this.',
)
exit(1)
hook_dcts = [
_hook(by_id[hook['id']], hook, root_config=root_config)
for hook in repo_config['hooks']
]
return tuple(
Hook.create(
repo_config['repo'],
Prefix(store.clone(repo, rev, hook['additional_dependencies'])),
hook,
)
for hook in hook_dcts
)
def _repository_hooks(
repo_config: dict[str, Any],
store: Store,
root_config: dict[str, Any],
) -> tuple[Hook, ...]:
if repo_config['repo'] in {LOCAL, META}:
return _non_cloned_repository_hooks(repo_config, store, root_config)
else:
return _cloned_repository_hooks(repo_config, store, root_config)
def install_hook_envs(hooks: Sequence[Hook], store: Store) -> None:
def _need_installed() -> list[Hook]:
seen: set[tuple[Prefix, str, str, tuple[str, ...]]] = set()
ret = []
for hook in hooks:
if hook.install_key not in seen and not _hook_installed(hook):
ret.append(hook)
seen.add(hook.install_key)
return ret
if not _need_installed():
return
with store.exclusive_lock():
# Another process may have already completed this work
for hook in _need_installed():
_hook_install(hook)
def all_hooks(root_config: dict[str, Any], store: Store) -> tuple[Hook, ...]:
return tuple(
hook
for repo in root_config['repos']
for hook in _repository_hooks(repo, store, root_config)
)
|