File: git.py

package info (click to toggle)
git-cola 4.13.0-1
  • links: PTS
  • area: main
  • in suites: sid
  • size: 6,480 kB
  • sloc: python: 36,938; sh: 304; makefile: 223; xml: 100; tcl: 62
file content (463 lines) | stat: -rw-r--r-- 14,759 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
from functools import partial
import errno
import os
from os.path import join
import subprocess
import threading
import time

from . import core
from .compat import int_types
from .compat import ustr
from .compat import WIN32
from .decorators import memoize
from .interaction import Interaction


GIT_COLA_TRACE = core.getenv('GIT_COLA_TRACE', '')
GIT = core.getenv('GIT_COLA_GIT', 'git')
STATUS = 0
STDOUT = 1
STDERR = 2

# Object ID / SHA-1 / SHA-256-related constants
# Git's empty tree is a built-in constant object name.
EMPTY_TREE_OID = '4b825dc642cb6eb9a060e54bf8d69288fbee4904'
# Git's diff machinery returns zeroes for modified files whose content exists
# in the worktree only.
MISSING_BLOB_OID = '0000000000000000000000000000000000000000'
# Git's SHA-1 object IDs are 40 characters long (20 bytes).
# Git's SHA-256 object IDs are 64 characters long (32 bytes).
# This will need to change when Git moves away from SHA-1.
# When that happens we'll have to detect and update this at runtime in
# order to support both old and new git.
OID_LENGTH = 40
OID_LENGTH_SHA256 = 64

_index_lock = threading.Lock()


def dashify(value):
    return value.replace('_', '-')


def is_git_dir(git_dir):
    """From git's setup.c:is_git_directory()."""
    result = False
    if git_dir:
        headref = join(git_dir, 'HEAD')

        if (
            core.isdir(git_dir)
            and (
                core.isdir(join(git_dir, 'objects'))
                and core.isdir(join(git_dir, 'refs'))
            )
            or (
                core.isfile(join(git_dir, 'gitdir'))
                and core.isfile(join(git_dir, 'commondir'))
            )
        ):
            result = core.isfile(headref) or (
                core.islink(headref) and core.readlink(headref).startswith('refs/')
            )
        else:
            result = is_git_file(git_dir)

    return result


def is_git_file(filename):
    return core.isfile(filename) and os.path.basename(filename) == '.git'


def is_git_worktree(dirname):
    return is_git_dir(join(dirname, '.git'))


def is_git_repository(path):
    return is_git_worktree(path) or is_git_dir(path)


def read_git_file(path):
    """Read the path from a .git-file

    `None` is returned when <path> is not a .git-file.

    """
    result = None
    if path and is_git_file(path):
        header = 'gitdir: '
        data = core.read(path).strip()
        if data.startswith(header):
            result = data[len(header) :]
            if result and not os.path.isabs(result):
                path_folder = os.path.dirname(path)
                repo_relative = join(path_folder, result)
                result = os.path.normpath(repo_relative)
    return result


class Paths:
    """Git repository paths of interest"""

    def __init__(self, git_dir=None, git_file=None, worktree=None, common_dir=None):
        if git_dir and not is_git_dir(git_dir):
            git_dir = None
        self.git_dir = git_dir
        self.git_file = git_file
        self.worktree = worktree
        self.common_dir = common_dir

    def get(self, path):
        """Search for git worktrees and bare repositories"""
        if not self.git_dir or not self.worktree:
            ceiling_dirs = set()
            ceiling = core.getenv('GIT_CEILING_DIRECTORIES')
            if ceiling:
                ceiling_dirs.update([x for x in ceiling.split(os.pathsep) if x])
            if path:
                path = core.abspath(path)
            self._search_for_git(path, ceiling_dirs)

        if self.git_dir:
            git_dir_path = read_git_file(self.git_dir)
            if git_dir_path:
                self.git_file = self.git_dir
                self.git_dir = git_dir_path

                commondir_file = join(git_dir_path, 'commondir')
                if core.exists(commondir_file):
                    common_path = core.read(commondir_file).strip()
                    if common_path:
                        if os.path.isabs(common_path):
                            common_dir = common_path
                        else:
                            common_dir = join(git_dir_path, common_path)
                            common_dir = os.path.normpath(common_dir)
                        self.common_dir = common_dir
        # usage: Paths().get()
        return self

    def _search_for_git(self, path, ceiling_dirs):
        """Search for git repositories located at path or above"""
        while path:
            if path in ceiling_dirs:
                break
            if is_git_dir(path):
                if not self.git_dir:
                    self.git_dir = path
                basename = os.path.basename(path)
                if not self.worktree and basename == '.git':
                    self.worktree = os.path.dirname(path)
            # We are either in a bare repository, or someone set GIT_DIR
            # but did not set GIT_WORK_TREE.
            if self.git_dir:
                if not self.worktree:
                    basename = os.path.basename(self.git_dir)
                    if basename == '.git':
                        self.worktree = os.path.dirname(self.git_dir)
                    elif path and not is_git_dir(path):
                        self.worktree = path
                break
            gitpath = join(path, '.git')
            if is_git_dir(gitpath):
                if not self.git_dir:
                    self.git_dir = gitpath
                if not self.worktree:
                    self.worktree = path
                break
            path, dummy = os.path.split(path)
            if not dummy:
                break


def find_git_directory(path):
    """Perform Git repository discovery"""
    return Paths(
        git_dir=core.getenv('GIT_DIR'), worktree=core.getenv('GIT_WORK_TREE')
    ).get(path)


class Git:
    """
    The Git class manages communication with the Git binary
    """

    def __init__(self, worktree=None):
        self.paths = Paths()

        self._valid = {}  #: Store the result of is_git_dir() for performance
        self.set_worktree(worktree or core.getcwd())

    def is_git_repository(self, path):
        return is_git_repository(path)

    def getcwd(self):
        """Return the working directory used by git()"""
        return self.paths.worktree or self.paths.git_dir

    def set_worktree(self, path):
        path = core.decode(path)
        self.paths = find_git_directory(path)
        return self.paths.worktree

    def worktree(self):
        if not self.paths.worktree:
            path = core.abspath(core.getcwd())
            self.paths = find_git_directory(path)
        return self.paths.worktree

    def is_valid(self):
        """Is this a valid git repository?

        Cache the result to avoid hitting the filesystem.

        """
        git_dir = self.paths.git_dir
        try:
            valid = bool(git_dir) and self._valid[git_dir]
        except KeyError:
            valid = self._valid[git_dir] = is_git_dir(git_dir)

        return valid

    def git_path(self, *paths):
        result = None
        if self.paths.git_dir:
            result = join(self.paths.git_dir, *paths)
        if result and self.paths.common_dir and not core.exists(result):
            common_result = join(self.paths.common_dir, *paths)
            if core.exists(common_result):
                result = common_result
        return result

    def git_dir(self):
        if not self.paths.git_dir:
            path = core.abspath(core.getcwd())
            self.paths = find_git_directory(path)
        return self.paths.git_dir

    def __getattr__(self, name):
        git_cmd = partial(self.git, name)
        setattr(self, name, git_cmd)
        return git_cmd

    @staticmethod
    def execute(
        command,
        _add_env=None,
        _cwd=None,
        _decode=True,
        _encoding=None,
        _raw=False,
        _stdin=None,
        _stderr=subprocess.PIPE,
        _stdout=subprocess.PIPE,
        _readonly=False,
        _no_win32_startupinfo=False,
    ):
        """
        Execute a command and returns its output

        :param command: argument list to execute.
        :param _cwd: working directory, defaults to the current directory.
        :param _decode: whether to decode output, defaults to True.
        :param _encoding: default encoding, defaults to None (utf-8).
        :param _readonly: avoid taking the index lock. Assume the command is read-only.
        :param _raw: do not strip trailing whitespace.
        :param _stdin: optional stdin filehandle.
        :returns (status, out, err): exit status, stdout, stderr

        """
        # Allow the user to have the command executed in their working dir.
        if not _cwd:
            _cwd = core.getcwd()

        extra = {}

        if hasattr(os, 'setsid'):
            # SSH uses the SSH_ASKPASS variable only if the process is really
            # detached from the TTY (stdin redirection and setting the
            # SSH_ASKPASS environment variable is not enough).  To detach a
            # process from the console it should fork and call os.setsid().
            extra['preexec_fn'] = os.setsid

        start_time = time.time()

        # Start the process
        # Guard against thread-unsafe .git/index.lock files
        if not _readonly:
            _index_lock.acquire()
        try:
            status, out, err = core.run_command(
                command,
                add_env=_add_env,
                cwd=_cwd,
                encoding=_encoding,
                stdin=_stdin,
                stdout=_stdout,
                stderr=_stderr,
                no_win32_startupinfo=_no_win32_startupinfo,
                **extra,
            )
        finally:
            # Let the next thread in
            if not _readonly:
                _index_lock.release()

        end_time = time.time()
        elapsed_time = abs(end_time - start_time)

        if not _raw and out is not None:
            out = core.UStr(out.rstrip('\n'), out.encoding)

        cola_trace = GIT_COLA_TRACE
        if cola_trace == 'trace':
            msg = f'trace: {elapsed_time:.3f}s: {core.list2cmdline(command)}'
            Interaction.log_status(status, msg, '')
        elif cola_trace == 'full':
            if out or err:
                core.print_stderr(
                    "# %.3fs: %s -> %d: '%s' '%s'"
                    % (elapsed_time, ' '.join(command), status, out, err)
                )
            else:
                core.print_stderr(
                    '# %.3fs: %s -> %d' % (elapsed_time, ' '.join(command), status)
                )
        elif cola_trace:
            core.print_stderr('# {:.3f}s: {}'.format(elapsed_time, ' '.join(command)))

        # Allow access to the command's status code
        return (status, out, err)

    def git(self, cmd, *args, **kwargs):
        # Handle optional arguments prior to calling transform_kwargs
        # otherwise they'll end up in args, which is bad.
        _kwargs = {'_cwd': self.getcwd()}
        execute_kwargs = (
            '_add_env',
            '_cwd',
            '_decode',
            '_encoding',
            '_stdin',
            '_stdout',
            '_stderr',
            '_raw',
            '_readonly',
            '_no_win32_startupinfo',
        )

        for kwarg in execute_kwargs:
            if kwarg in kwargs:
                _kwargs[kwarg] = kwargs.pop(kwarg)

        # Prepare the argument list
        git_args = [
            GIT,
            '-c',
            'diff.suppressBlankEmpty=false',
            '-c',
            'diff.autoRefreshIndex=false',
            '-c',
            'log.showSignature=false',
            dashify(cmd),
        ]
        opt_args = transform_kwargs(**kwargs)
        call = git_args + opt_args
        call.extend(args)
        try:
            result = self.execute(call, **_kwargs)
        except OSError as exc:
            if WIN32 and exc.errno == errno.ENOENT:
                # see if git exists at all. On win32 it can fail with ENOENT in
                # case of argv overflow. We should be safe from that but use
                # defensive coding for the worst-case scenario. On UNIX
                # we have ENAMETOOLONG but that doesn't exist on Windows.
                if _git_is_installed():
                    raise exc
                _print_win32_git_hint()
            result = (1, '', "error: unable to execute '%s'" % GIT)
        return result


def _git_is_installed():
    """Return True if git is installed"""
    # On win32 Git commands can fail with ENOENT in case of argv overflow. We
    # should be safe from that but use defensive coding for the worst-case
    # scenario. On UNIX we have ENAMETOOLONG but that doesn't exist on
    # Windows.
    try:
        status, _, _ = Git.execute([GIT, '--version'])
        result = status == 0
    except OSError:
        result = False
    return result


def transform_kwargs(**kwargs):
    """Transform kwargs into git command line options

    Callers can assume the following behavior:

    Passing foo=None ignores foo, so that callers can
    use default values of None that are ignored unless
    set explicitly.

    Passing foo=False ignore foo, for the same reason.

    Passing foo={string-or-number} results in ['--foo=<value>']
    in the resulting arguments.

    """
    args = []
    types_to_stringify = (ustr, float, str) + int_types

    for k, value in kwargs.items():
        if len(k) == 1:
            dashes = '-'
            equals = ''
        else:
            dashes = '--'
            equals = '='
        # isinstance(False, int) is True, so we have to check bool first
        if isinstance(value, bool):
            if value:
                args.append(f'{dashes}{dashify(k)}')
            # else: pass  # False is ignored; flag=False inhibits --flag
        elif isinstance(value, types_to_stringify):
            args.append(f'{dashes}{dashify(k)}{equals}{value}')

    return args


def win32_git_error_hint():
    return (
        '\n'
        'NOTE: If you have Git installed in a custom location, e.g.\n'
        'C:\\Tools\\Git, then you can create a file at\n'
        '~/.config/git-cola/git-bindir with following text\n'
        'and git-cola will add the specified location to your $PATH\n'
        'automatically when starting cola:\n'
        '\n'
        r'C:\Tools\Git\bin'
    )


@memoize
def _print_win32_git_hint():
    hint = '\n' + win32_git_error_hint() + '\n'
    core.print_stderr("error: unable to execute 'git'" + hint)


def create():
    """Create Git instances

    >>> git = create()
    >>> status, out, err = git.version()
    >>> 'git' == out[:3].lower()
    True

    """
    return Git()