File: _engine.py

package info (click to toggle)
git-delete-merged-branches 7.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 300 kB
  • sloc: python: 1,843; sh: 26; makefile: 2
file content (598 lines) | stat: -rw-r--r-- 24,604 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
# Copyright (C) 2020 Sebastian Pipping <sebastian@pipping.org>
# Licensed under GPL v3 or later

import os
import re
from functools import partial, reduce
from operator import and_
from subprocess import CalledProcessError
from typing import Optional

from ._git import CheckoutFailed, MergeBaseFailed, PullFailed
from ._metadata import APP


class _DmbException(Exception):
    pass


class _GitRepositoryWithoutBranches(_DmbException):
    """
    Exception for the time between "git init" and the first "git commit"
    where "git branch" will tell us that there are no branches
    """

    def __init__(self):
        super().__init__("This Git repository does not have any branches.")


class _NoSuchBranchException(_DmbException):
    def __init__(self, branch_name):
        super().__init__(f"There is no branch {branch_name!r}.")


class _NoSuchRemoteException(_DmbException):
    def __init__(self, remote_name):
        super().__init__(f"There is no remote {remote_name!r}.")


class _TooFewOptionsAvailable(_DmbException):
    pass


class _ZeroMergeTargetsException(_DmbException):
    def __init__(self):
        super().__init__("One or more existing target branch is required.")


class _InvalidRegexPattern(_DmbException):
    def __init__(self, pattern):
        super().__init__(
            f'Pattern "{pattern}" is not well-formed regular expression syntax '
            '(with regard to Python module "re").'
        )


class _CannotDryRunConfigurationError(_DmbException):
    def __init__(self, force_reconfiguration):
        message = (
            "Arguments --configure and --dry-run are not compatible."
            if force_reconfiguration
            else f"{APP}' need for configuration and argument --dry-run are not compatible."
        )
        super().__init__(message)


class DeleteMergedBranches:
    _CONFIG_KEY_CONFIGURED = "delete-merged-branches.configured"
    _CONFIG_VALUE_CONFIGURED = "5.0.0+"  # i.e. most ancient version with compatible config
    _CONFIG_VALUE_TRUE = "true"
    _PATTERN_REMOTE_ENABLED = "^remote\\.(?P<name>[^ ]+)\\.dmb-enabled$"
    _PATTERN_BRANCH_EXCLUDED = "^branch\\.(?P<name>[^ ]+)\\.dmb-excluded$"
    _PATTERN_BRANCH_REQUIRED = "^branch\\.(?P<name>[^ ]+)\\.dmb-required$"
    _FORMAT_REMOTE_ENABLED = "remote.{name}.dmb-enabled"
    _FORMAT_BRANCH_EXCLUDED = "branch.{name}.dmb-excluded"
    _FORMAT_BRANCH_REQUIRED = "branch.{name}.dmb-required"

    def __init__(self, git, messenger, confirmation, selector, effort_level):
        self._confirmation = confirmation
        self._messenger = messenger
        self._git = git
        self._selector = selector
        self._effort_using_git_cherry = effort_level >= 2
        self._effort_using_squashed_copies = effort_level >= 3

    def _interactively_edit_list(
        self, description, valid_names, old_names, format, min_selection_count
    ) -> set[str]:
        if len(valid_names) < min_selection_count:
            raise _TooFewOptionsAvailable

        help = (
            "(Press [Space] to toggle selection, [Enter]/[Return] to accept, [Ctrl]+[C] to quit.)"
        )

        old_names = set(old_names)
        initial_selection = [i for i, name in enumerate(valid_names) if name in old_names]
        if valid_names:
            new_names = set(
                self._selector(
                    self._messenger,
                    valid_names,
                    initial_selection,
                    description,
                    help,
                    min_selection_count,
                )
            )
        else:
            new_names = set()
        assert len(new_names) >= min_selection_count
        names_to_remove = old_names - new_names
        names_to_add = new_names - old_names

        for names, new_value in ((names_to_remove, None), (names_to_add, self._CONFIG_VALUE_TRUE)):
            for name in names:
                key = format.format(name=name)
                self._git.set_config(key, new_value)

        return new_names

    def _configure_required_branches(self, git_config) -> set[str]:
        try:
            return self._interactively_edit_list(
                "[1/3] For a branch to be considered"
                " fully merged, which other branches"
                " must it have been merged to?",
                self._git.find_local_branches(),
                self.find_required_branches(git_config),
                self._FORMAT_BRANCH_REQUIRED,
                min_selection_count=1,
            )
        except _TooFewOptionsAvailable:
            raise _GitRepositoryWithoutBranches

    def _configure_excluded_branches(self, git_config, new_required_branches: set[str]):
        valid_names = sorted(set(self._git.find_all_branch_names()) - new_required_branches)
        self._interactively_edit_list(
            "[2/3] Which of these branches (if any) should be kept around at all times?",
            valid_names,
            self.find_excluded_branches(git_config),
            self._FORMAT_BRANCH_EXCLUDED,
            min_selection_count=0,
        )

    def _configure_enabled_remotes(self, git_config):
        self._interactively_edit_list(
            "[3/3] Which remotes (if any) do you want to enable deletion of merged branches for?",
            self._git.find_remotes(),
            self.find_enabled_remotes(git_config),
            self._FORMAT_REMOTE_ENABLED,
            min_selection_count=0,
        )

    def _configure(self, git_config):
        repo_basename = os.path.basename(os.getcwd())
        self._messenger.tell_info(f"Configure {APP} for repository {repo_basename!r}:")

        new_required_branches = self._configure_required_branches(git_config)
        self._configure_excluded_branches(git_config, new_required_branches)
        self._configure_enabled_remotes(git_config)
        self._git.set_config(self._CONFIG_KEY_CONFIGURED, self._CONFIG_VALUE_CONFIGURED)

    @classmethod
    def _is_configured(cls, git_config):
        return git_config.get(cls._CONFIG_KEY_CONFIGURED) == cls._CONFIG_VALUE_CONFIGURED

    def ensure_configured(self, force_reconfiguration):
        git_config = self._git.extract_git_config()
        if force_reconfiguration or not self._is_configured(git_config):
            if self._git.pretend:
                raise _CannotDryRunConfigurationError(force_reconfiguration)
            self._configure(git_config)
            git_config = self._git.extract_git_config()
        assert self._is_configured(git_config)
        return git_config

    @classmethod
    def _filter_git_config(cls, git_config, pattern):
        matcher = re.compile(pattern)
        matched_names = []
        for key, value in git_config.items():
            match = matcher.match(key)
            if match and value == cls._CONFIG_VALUE_TRUE:
                matched_names.append(match.group("name"))
        return matched_names

    @classmethod
    def find_excluded_branches(cls, git_config):
        return cls._filter_git_config(git_config, cls._PATTERN_BRANCH_EXCLUDED)

    @classmethod
    def find_required_branches(cls, git_config):
        return cls._filter_git_config(git_config, cls._PATTERN_BRANCH_REQUIRED)

    @classmethod
    def find_enabled_remotes(cls, git_config):
        return cls._filter_git_config(git_config, cls._PATTERN_REMOTE_ENABLED)

    def _find_branches_merged_using_git_branch_merged(
        self, required_target_branches, remote_name: Optional[str]
    ) -> set[str]:
        if remote_name is None:
            find_branches_that_were_merged_into = self._git.find_merged_local_branches_for
        else:
            find_branches_that_were_merged_into = partial(
                self._git.find_merged_remote_branches_for, remote_name
            )

        if len(required_target_branches) == 1:
            target_branch = next(iter(required_target_branches))
            branches_merged_to_all_required_targets = set(
                find_branches_that_were_merged_into(target_branch)
            )
        else:
            branches_merged_to_all_required_targets = reduce(
                and_,
                (
                    set(find_branches_that_were_merged_into(target_branch))
                    for target_branch in required_target_branches
                ),
            )

        return branches_merged_to_all_required_targets

    def _has_been_squash_merged_into(self, target_branch, topic_branch) -> bool:
        """
        Tries to detect a squashed merge, i.e. where a single commit
        on the target branch pulls in the sum of all commits
        between the common merge base commit and the tip of the topic branch.

        The implementation creates a temporary squashed copy of those commits
        and then asks ``git cherry`` if that squashed commit has an equivalent
        on the target branch.
        """
        try:
            merge_base_commit_sha1 = self._git.merge_base(target_branch, topic_branch)
        except MergeBaseFailed:
            return False
        squash_merge_commit_sha1 = self._git.commit_tree(
            message=f"Squash-merge {topic_branch!r}",
            parent_committish=merge_base_commit_sha1,
            tree=topic_branch + "^{tree}",
        )

        cherry_lines = self._git.cherry(target_branch, squash_merge_commit_sha1)
        defacto_merged_into_target = all(line.startswith("-") for line in cherry_lines)
        return defacto_merged_into_target

    def _find_branches_merged_using_git_cherry(
        self, required_target_branches, candidate_branches
    ) -> set[str]:
        assert required_target_branches

        if not candidate_branches:
            return set()

        branches_merged_to_all_required_targets = set()
        candidates_for_squashed_merges = []

        for topic_branch in sorted(candidate_branches):
            assert topic_branch not in required_target_branches

            for target_branch in sorted(required_target_branches):
                cherry_lines = self._git.cherry(target_branch, topic_branch)
                defacto_merged_into_target = all(line.startswith("-") for line in cherry_lines)
                if not defacto_merged_into_target:
                    if len(cherry_lines) > 1:
                        candidates_for_squashed_merges.append(topic_branch)
                    break
            else:  # i.e. no break happened above
                branches_merged_to_all_required_targets.add(topic_branch)

        if self._effort_using_squashed_copies:
            check_for_squash_merges = True

            if candidates_for_squashed_merges:
                if self._git.has_detached_heads():
                    self._messenger.tell_info(
                        "Skipped further inspection of branches because of detached HEAD."
                    )
                    check_for_squash_merges = False

                if check_for_squash_merges:
                    if self._git.has_uncommitted_changes():
                        self._messenger.tell_info(
                            "Skipped further inspection of branches due to uncommitted changes."
                        )
                        check_for_squash_merges = False

            if check_for_squash_merges:
                for topic_branch in candidates_for_squashed_merges:
                    for target_branch in sorted(required_target_branches):
                        defacto_merged_into_target = self._has_been_squash_merged_into(
                            target_branch=target_branch, topic_branch=topic_branch
                        )
                        if not defacto_merged_into_target:
                            break
                    else:  # i.e. no break happened above
                        branches_merged_to_all_required_targets.add(topic_branch)

        return branches_merged_to_all_required_targets

    def _find_branches_merged_to_all_targets_for_single_remote(
        self, required_target_branches, excluded_branches: set[str], remote_name: Optional[str]
    ) -> tuple[set[str], set[str]]:
        if remote_name is not None:
            excluded_branches = {
                f"{remote_name}/{branch_name}" for branch_name in excluded_branches
            }

        truly_merged_branches = (
            self._find_branches_merged_using_git_branch_merged(
                required_target_branches, remote_name=remote_name
            )
        ) - excluded_branches

        if self._effort_using_git_cherry:
            if remote_name is None:
                all_branches_at_remote = set(self._git.find_local_branches())
            else:
                all_branches_at_remote = set(self._git.find_remote_branches_at(remote_name))
            if remote_name is not None:
                required_target_branches = {
                    f"{remote_name}/{branch_name}" for branch_name in required_target_branches
                }
            branches_to_inspect_using_git_cherry = (
                all_branches_at_remote
                - required_target_branches
                - excluded_branches
                - truly_merged_branches
            )
            defacto_merged_branches = self._find_branches_merged_using_git_cherry(
                required_target_branches, branches_to_inspect_using_git_cherry
            )
        else:
            defacto_merged_branches = set()

        return (truly_merged_branches, defacto_merged_branches)

    def _report_branches_as_deleted(self, branch_names: set[str], remote_name: str = None):
        branch_type = "local" if (remote_name is None) else "remote"
        info_text = f"{len(branch_names)} {branch_type} branch(es) deleted."
        self._messenger.tell_info(info_text)

    def _delete_local_merged_branches_for(self, required_target_branches, excluded_branches):
        for working_tree_branch in self._git.find_working_tree_branches():
            branch_would_be_analyzed = (
                working_tree_branch is not None
                and working_tree_branch not in required_target_branches
                and working_tree_branch not in excluded_branches
            )
            if branch_would_be_analyzed:
                excluded_branches = excluded_branches | {working_tree_branch}
                self._messenger.tell_info(
                    f"Skipped branch {working_tree_branch!r} because it is currently checked out."
                )

        truly_merged, defacto_merged = self._find_branches_merged_to_all_targets_for_single_remote(
            required_target_branches, excluded_branches, remote_name=None
        )

        local_branches_to_delete = truly_merged | defacto_merged

        if not local_branches_to_delete:
            self._messenger.tell_info("No local branches deleted.")
            return

        description = (
            f"You are about to delete {len(local_branches_to_delete)}"
            " local branch(es):\n"
            + "\n".join(f"  - {name}" for name in sorted(local_branches_to_delete))
            + "\n\nDelete?"
        )
        if not self._confirmation.confirmed(description):
            return

        # NOTE: With regard to reporting, the idea is to
        #       - report all deleted local branches at once when deletion was successful, and to
        #       - not silence partial success
        #         when the first delete call was successful and the second call was not.
        self._git.delete_local_branches(truly_merged)
        try:
            self._git.delete_local_branches(defacto_merged, force=True)
        except CalledProcessError:
            self._report_branches_as_deleted(truly_merged)
            raise
        else:
            self._report_branches_as_deleted(truly_merged | defacto_merged)

    def _delete_remote_merged_branches_for(
        self, required_target_branches, excluded_branches, remote_name, all_branch_refs: set[str]
    ):
        if not all(
            (f"{remote_name}/{branch_name}" in all_branch_refs)
            for branch_name in required_target_branches
        ):
            self._messenger.tell_info(
                f"Skipped remote {remote_name!r} as it does not have all required branches."
            )
            return

        truly_merged, defacto_merged = self._find_branches_merged_to_all_targets_for_single_remote(
            required_target_branches, excluded_branches, remote_name=remote_name
        )
        remote_branches_to_delete = [
            b for b in (truly_merged | defacto_merged) if b.startswith(f"{remote_name}/")
        ]

        if not remote_branches_to_delete:
            self._messenger.tell_info("No remote branches deleted.")
            return

        description = (
            f"You are about to delete {len(remote_branches_to_delete)} "
            "remote branch(es):\n"
            + "\n".join(f"  - {name}" for name in sorted(remote_branches_to_delete))
            + "\n\nDelete?"
        )
        if not self._confirmation.confirmed(description):
            return

        self._git.delete_remote_branches(remote_branches_to_delete, remote_name)
        self._report_branches_as_deleted(remote_branches_to_delete, remote_name)

    def refresh_remotes(self, enabled_remotes):
        sorted_remotes = sorted(set(enabled_remotes))
        if not sorted_remotes:
            return

        description = (
            f'Do you want to run "git remote update --prune"'
            f" for {len(sorted_remotes)} remote(s):\n"
            + "\n".join(f"  - {name}" for name in sorted_remotes)
            + "\n\nUpdate?"
        )
        if not self._confirmation.confirmed(description):
            return

        for remote_name in sorted_remotes:
            self._git.update_and_prune_remote(remote_name)

    def detect_stale_remotes(self, enabled_remotes, required_target_branches):
        sorted_remotes = sorted(set(enabled_remotes))
        if not sorted_remotes:
            return

        sorted_required_target_branches = sorted(set(required_target_branches))
        assert sorted_required_target_branches

        for remote_name in enabled_remotes:
            remote_branches = set(self._git.find_remote_branches_at(remote_name))
            not_fully_pushed_branches = [
                branch
                for branch in sorted_required_target_branches
                if f"{remote_name}/{branch}" in remote_branches
                and self._git.has_unpushed_commits_on(
                    branch, with_regard_to=f"{remote_name}/{branch}"
                )
            ]

            if not_fully_pushed_branches:
                self._messenger.tell_info(
                    (
                        f"Remote {remote_name!r} is not up to date with"
                        f" {len(not_fully_pushed_branches)} local"
                        " branch(es):\n"
                    )
                    + "\n".join(f"  - {branch}" for branch in not_fully_pushed_branches)
                    + (
                        "\n\nThis will likely impair detection"
                        f" of merged branches for remote {remote_name!r}."
                        "\nPlease consider getting it back in sync"
                        " by running\n"
                    )
                    + "\n".join(
                        f"  $ git push {remote_name} {branch}"
                        for branch in not_fully_pushed_branches
                    )
                    + f"\n\nand then invoking {APP}, again."
                )

    def refresh_target_branches(self, required_target_branches):
        sorted_branches = sorted(set(required_target_branches))
        if not sorted_branches:
            return

        initial_branch = self._git.find_current_branch()
        if initial_branch is None or self._git.has_detached_heads():
            self._messenger.tell_info("Skipped refreshing branches because of detached HEAD.")
            return

        if self._git.has_uncommitted_changes():
            self._messenger.tell_info("Skipped refreshing branches due to uncommitted changes.")
            return

        description = (
            f'Do you want to run "git pull --ff-only"'
            f" for {len(sorted_branches)} branch(es):\n"
            + "\n".join(f"  - {name}" for name in sorted_branches)
            + "\n\nPull?"
        )
        if not self._confirmation.confirmed(description):
            return

        needs_a_switch_back = False
        try:
            for branch_name in sorted_branches:
                if branch_name != initial_branch:
                    try:
                        self._git.checkout(branch_name)
                    except CheckoutFailed:
                        self._messenger.tell_error(
                            f"Refreshing local branch {branch_name!r}"
                            " failed"
                            " because the branch cannot be checkout out."
                        )
                        continue
                    needs_a_switch_back = True

                try:
                    self._git.pull_ff_only()
                except PullFailed:
                    self._messenger.tell_error(
                        f"Refreshing local branch {branch_name!r} failed"
                        " because the branch cannot be pulled"
                        " with fast forward."
                    )
        finally:
            if needs_a_switch_back:
                self._git.checkout(initial_branch)

    def delete_merged_branches(self, required_target_branches, excluded_branches, enabled_remotes):
        self._delete_local_merged_branches_for(required_target_branches, excluded_branches)
        all_branch_refs = set(self._git.find_all_branch_refs())
        for remote_name in enabled_remotes:
            self._delete_remote_merged_branches_for(
                required_target_branches, excluded_branches, remote_name, all_branch_refs
            )

    def determine_excluded_branches(
        self, git_config: dict, excluded_branches: list[str], included_branches_patterns: list[str]
    ) -> set[str]:
        existing_branches = set(self._git.find_all_branch_names())
        if excluded_branches:
            excluded_branches_set = set(excluded_branches)
            invalid_branches = excluded_branches_set - existing_branches
            if invalid_branches:
                raise _NoSuchBranchException(sorted(invalid_branches)[0])
        else:
            excluded_branches_set = set()

        excluded_branches_set |= set(self.find_excluded_branches(git_config)) & existing_branches

        # The inclusion patterns are meant to work in logical conjunction ("and") but an empty
        # list should not exclude any branches.  So we'll add any existing branch to the exclusion
        # set that fails to match any of the inclusion patterns:
        for included_branches_pattern in included_branches_patterns:
            try:
                matcher = re.compile(included_branches_pattern)
            except re.error:
                raise _InvalidRegexPattern(included_branches_pattern)

            for branch_name in existing_branches:
                if matcher.search(branch_name):
                    continue
                excluded_branches_set.add(branch_name)

        return excluded_branches_set

    def determine_required_target_branches(
        self, git_config: dict, required_target_branches: list[str]
    ):
        existing_branches = set(self._git.find_local_branches())
        if required_target_branches:
            required_target_branches_set = set(required_target_branches)
            invalid_branches = required_target_branches_set - existing_branches
            if invalid_branches:
                raise _NoSuchBranchException(required_target_branches[0])
        else:
            required_target_branches_set = (
                set(self.find_required_branches(git_config)) & existing_branches
            )

        if not required_target_branches_set:
            raise _ZeroMergeTargetsException

        return required_target_branches_set

    def determine_enabled_remotes(self, git_config: dict, enabled_remotes: list[str]):
        existing_remotes = set(self._git.find_remotes())
        if enabled_remotes:
            enabled_remotes_set = set(enabled_remotes)
            invalid_remotes = enabled_remotes_set - existing_remotes
            if invalid_remotes:
                raise _NoSuchRemoteException(enabled_remotes[0])
            return enabled_remotes_set
        else:
            return set(self.find_enabled_remotes(git_config)) & existing_remotes