File: owners_client.py

package info (click to toggle)
chromium 139.0.7258.127-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 6,122,068 kB
  • sloc: cpp: 35,100,771; ansic: 7,163,530; javascript: 4,103,002; python: 1,436,920; asm: 946,517; xml: 746,709; pascal: 187,653; perl: 88,691; sh: 88,436; objc: 79,953; sql: 51,488; cs: 44,583; fortran: 24,137; makefile: 22,147; tcl: 15,277; php: 13,980; yacc: 8,984; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (227 lines) | stat: -rw-r--r-- 8,468 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# Copyright (c) 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import os
import random

import gerrit_util
import git_common


class OwnersClient(object):
    """Interact with OWNERS files in a repository.

    This class allows you to interact with OWNERS files in a repository both the
    Gerrit Code-Owners plugin REST API, and the owners database implemented by
    Depot Tools in owners.py:

        - List all the owners for a group of files.
        - Check if files have been approved.
        - Suggest owners for a group of files.

    All code should use this class to interact with OWNERS files instead of the
    owners database in owners.py
    """
    # '*' means that everyone can approve.
    EVERYONE = '*'

    # Possible status of a file.
    # - INSUFFICIENT_REVIEWERS: The path needs owners approval, but none of its
    #   owners is currently a reviewer of the change.
    # - PENDING: An owner of this path has been added as reviewer, but approval
    #   has not been given yet.
    # - APPROVED: The path has been approved by an owner.
    APPROVED = 'APPROVED'
    PENDING = 'PENDING'
    INSUFFICIENT_REVIEWERS = 'INSUFFICIENT_REVIEWERS'

    def ListOwners(self, path):
        """List all owners for a file.

        The returned list is sorted so that better owners appear first.
        """
        raise Exception('Not implemented')

    def BatchListOwners(self, paths):
        """List all owners for a group of files.

        Returns a dictionary {path: [owners]}.
        """
        if not paths:
            return dict()
        nproc = min(gerrit_util.MAX_CONCURRENT_CONNECTION, len(paths))
        with git_common.ScopedPool(nproc, kind='threads') as pool:
            return dict(
                pool.imap_unordered(lambda p: (p, self.ListOwners(p)), paths))

    def GetFilesApprovalStatus(self, paths, approvers, reviewers):
        """Check the approval status for the given paths.

        Utility method to check for approval status when a change has not yet
        been created, given reviewers and approvers.

        See GetChangeApprovalStatus for description of the returned value.
        """
        approvers = set(approvers)
        if approvers:
            approvers.add(self.EVERYONE)
        reviewers = set(reviewers)
        if reviewers:
            reviewers.add(self.EVERYONE)
        status = {}
        owners_by_path = self.BatchListOwners(paths)
        for path, owners in owners_by_path.items():
            owners = set(owners)
            if owners.intersection(approvers):
                status[path] = self.APPROVED
            elif owners.intersection(reviewers):
                status[path] = self.PENDING
            else:
                status[path] = self.INSUFFICIENT_REVIEWERS
        return status

    def ScoreOwners(self, paths, exclude=None):
        """Get sorted list of owners for the given paths."""
        if not paths:
            return []
        exclude = exclude or []
        owners = []
        queues = self.BatchListOwners(paths).values()
        for i in range(max(len(q) for q in queues)):
            for q in queues:
                if i < len(q) and q[i] not in owners and q[i] not in exclude:
                    owners.append(q[i])
        return owners

    def SuggestOwners(self, paths, exclude=None):
        """Suggest a set of owners for the given paths."""
        exclude = exclude or []

        paths_by_owner = {}
        owners_by_path = self.BatchListOwners(paths)
        for path, owners in owners_by_path.items():
            for owner in owners:
                paths_by_owner.setdefault(owner, set()).add(path)

        selected = []
        missing = set(paths)
        for owner in self.ScoreOwners(paths, exclude=exclude):
            missing_len = len(missing)
            missing.difference_update(paths_by_owner[owner])
            if missing_len > len(missing):
                selected.append(owner)
            if not missing:
                break

        return selected

    def SuggestMinimalOwners(self,
                             paths: list[str],
                             exclude: list[str] = None) -> list[str]:
        """
        Suggest a set of owners for the given paths. Never return an owner in
        the |exclude| list.

        Aims to provide only one, but will provide more if it's unable to
        find a common owner.
        """
        exclude = exclude or []

        owners_by_path = self.BatchListOwners(paths)
        if not owners_by_path:
            return []

        common_owners = set(owners_by_path.popitem()[1]) - set(exclude)
        for _, owners in owners_by_path.items():
            common_owners = common_owners.intersection(set(owners))

        if not common_owners:
            # This likely means some of the files had `noparent` set.
            # Fall back to the default suggestion algorithm, which accounts
            # for noparent but is liable to return many different owners
            return self.SuggestOwners(paths, exclude)

        # Return an arbitrary common owner, preferring those with a good score
        sorted_common_owners = [
            owner for owner in self.ScoreOwners(paths, exclude=exclude)
            if owner in common_owners
        ]

        # Return a singleton list so this function has a consistent return type
        return sorted_common_owners[:1]


class GerritClient(OwnersClient):
    """Implement OwnersClient using OWNERS REST API."""
    def __init__(self, host, project, branch):
        super(GerritClient, self).__init__()

        self._host = host
        self._project = project
        self._branch = branch
        self._owners_cache = {}
        self._best_owners_cache = {}

        # Seed used by Gerrit to shuffle code owners that have the same score.
        # Can be used to make the sort order stable across several requests,
        # e.g. to get the same set of random code owners for different file
        # paths that have the same code owners.
        self._seed = random.getrandbits(30)

    def _FetchOwners(self, path, cache, highest_score_only=False):
        # Always use slashes as separators.
        path = path.replace(os.sep, '/')
        if path not in cache:
            # GetOwnersForFile returns a list of account details sorted by order
            # of best reviewer for path. If owners have the same score, the
            # order is random, seeded by `self._seed`.
            data = gerrit_util.GetOwnersForFile(
                self._host,
                self._project,
                self._branch,
                path,
                resolve_all_users=False,
                highest_score_only=highest_score_only,
                seed=self._seed)
            cache[path] = [
                d['account']['email'] for d in data['code_owners']
                if 'account' in d and 'email' in d['account']
            ]
            # If owned_by_all_users is true, add everyone as an owner at the end
            # of the owners list.
            if data.get('owned_by_all_users', False):
                cache[path].append(self.EVERYONE)
        return cache[path]

    def ListOwners(self, path):
        return self._FetchOwners(path, self._owners_cache)

    def ListBestOwners(self, path):
        return self._FetchOwners(path,
                                 self._best_owners_cache,
                                 highest_score_only=True)

    def BatchListBestOwners(self, paths):
        """List only the higest-scoring owners for a group of files.

        Returns a dictionary {path: [owners]}.
        """
        with git_common.ScopedPool(kind='threads') as pool:
            return dict(
                pool.imap_unordered(lambda p: (p, self.ListBestOwners(p)),
                                    paths))


def GetCodeOwnersClient(host, project, branch):
    """Get a new OwnersClient.

    Uses GerritClient and raises an exception if code-owners plugin is not
    available."""
    if gerrit_util.IsCodeOwnersEnabledOnHost(host):
        return GerritClient(host, project, branch)
    raise Exception(
        'code-owners plugin is not enabled. Ask your host admin to enable it '
        'on %s. Read more about code-owners at '
        'https://chromium-review.googlesource.com/'
        'plugins/code-owners/Documentation/index.html.' % host)