File: docker.py

package info (click to toggle)
pre-commit 4.5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,452 kB
  • sloc: python: 14,482; sh: 87; makefile: 4
file content (181 lines) | stat: -rw-r--r-- 5,412 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from __future__ import annotations

import contextlib
import functools
import hashlib
import json
import os
import re
from collections.abc import Sequence

from pre_commit import lang_base
from pre_commit.prefix import Prefix
from pre_commit.util import CalledProcessError
from pre_commit.util import cmd_output_b

ENVIRONMENT_DIR = 'docker'
PRE_COMMIT_LABEL = 'PRE_COMMIT'
get_default_version = lang_base.basic_get_default_version
health_check = lang_base.basic_health_check
in_env = lang_base.no_env  # no special environment for docker

_HOSTNAME_MOUNT_RE = re.compile(
    rb"""
    /containers
    (?:/overlay-containers)?
    /([a-z0-9]{64})
    (?:/userdata)?
    /hostname
    """,
    re.VERBOSE,
)


def _get_container_id() -> str | None:
    with contextlib.suppress(FileNotFoundError):
        with open('/proc/1/mountinfo', 'rb') as f:
            for line in f:
                m = _HOSTNAME_MOUNT_RE.search(line)
                if m:
                    return m[1].decode()

    return None


def _get_docker_path(path: str) -> str:
    container_id = _get_container_id()
    if container_id is None:
        return path

    try:
        _, out, _ = cmd_output_b('docker', 'inspect', container_id)
    except CalledProcessError:
        # self-container was not visible from here (perhaps docker-in-docker)
        return path

    container, = json.loads(out)
    for mount in container['Mounts']:
        src_path = mount['Source']
        to_path = mount['Destination']
        if os.path.commonpath((path, to_path)) == to_path:
            # So there is something in common,
            # and we can proceed remapping it
            return path.replace(to_path, src_path)
    # we're in Docker, but the path is not mounted, cannot really do anything,
    # so fall back to original path
    return path


def md5(s: str) -> str:  # pragma: win32 no cover
    return hashlib.md5(s.encode()).hexdigest()


def docker_tag(prefix: Prefix) -> str:  # pragma: win32 no cover
    md5sum = md5(os.path.basename(prefix.prefix_dir)).lower()
    return f'pre-commit-{md5sum}'


def build_docker_image(
        prefix: Prefix,
        *,
        pull: bool,
) -> None:  # pragma: win32 no cover
    cmd: tuple[str, ...] = (
        'docker', 'build',
        '--tag', docker_tag(prefix),
        '--label', PRE_COMMIT_LABEL,
    )
    if pull:
        cmd += ('--pull',)
    # This must come last for old versions of docker.  See #477
    cmd += ('.',)
    lang_base.setup_cmd(prefix, cmd)


def install_environment(
        prefix: Prefix, version: str, additional_dependencies: Sequence[str],
) -> None:  # pragma: win32 no cover
    lang_base.assert_version_default('docker', version)
    lang_base.assert_no_additional_deps('docker', additional_dependencies)

    directory = lang_base.environment_dir(prefix, ENVIRONMENT_DIR, version)

    # Docker doesn't really have relevant disk environment, but pre-commit
    # still needs to cleanup its state files on failure
    build_docker_image(prefix, pull=True)
    os.mkdir(directory)


@functools.lru_cache(maxsize=1)
def _is_rootless() -> bool:  # pragma: win32 no cover
    retcode, out, _ = cmd_output_b(
        'docker', 'system', 'info', '--format', '{{ json . }}',
    )
    if retcode != 0:
        return False

    info = json.loads(out)
    try:
        return (
            # docker:
            # https://docs.docker.com/reference/api/engine/version/v1.48/#tag/System/operation/SystemInfo
            'name=rootless' in (info.get('SecurityOptions') or ()) or
            # podman:
            # https://docs.podman.io/en/latest/_static/api.html?version=v5.4#tag/system/operation/SystemInfoLibpod
            info['host']['security']['rootless']
        )
    except KeyError:
        return False


def get_docker_user() -> tuple[str, ...]:  # pragma: win32 no cover
    if _is_rootless():
        return ()

    try:
        return ('-u', f'{os.getuid()}:{os.getgid()}')
    except AttributeError:
        return ()


def get_docker_tty(*, color: bool) -> tuple[str, ...]:  # pragma: win32 no cover  # noqa: E501
    return (('--tty',) if color else ())


def docker_cmd(*, color: bool) -> tuple[str, ...]:  # pragma: win32 no cover
    return (
        'docker', 'run',
        '--rm',
        *get_docker_tty(color=color),
        *get_docker_user(),
        # https://docs.docker.com/engine/reference/commandline/run/#mount-volumes-from-container-volumes-from
        # The `Z` option tells Docker to label the content with a private
        # unshared label. Only the current container can use a private volume.
        '-v', f'{_get_docker_path(os.getcwd())}:/src:rw,Z',
        '--workdir', '/src',
    )


def run_hook(
        prefix: Prefix,
        entry: str,
        args: Sequence[str],
        file_args: Sequence[str],
        *,
        is_local: bool,
        require_serial: bool,
        color: bool,
) -> tuple[int, bytes]:  # pragma: win32 no cover
    # Rebuild the docker image in case it has gone missing, as many people do
    # automated cleanup of docker images.
    build_docker_image(prefix, pull=False)

    entry_exe, *cmd_rest = lang_base.hook_cmd(entry, args)

    entry_tag = ('--entrypoint', entry_exe, docker_tag(prefix))
    return lang_base.run_xargs(
        (*docker_cmd(color=color), *entry_tag, *cmd_rest),
        file_args,
        require_serial=require_serial,
        color=color,
    )