1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
|
# Copyright (c) 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import random
import gerrit_util
import git_common
class OwnersClient(object):
"""Interact with OWNERS files in a repository.
This class allows you to interact with OWNERS files in a repository both the
Gerrit Code-Owners plugin REST API, and the owners database implemented by
Depot Tools in owners.py:
- List all the owners for a group of files.
- Check if files have been approved.
- Suggest owners for a group of files.
All code should use this class to interact with OWNERS files instead of the
owners database in owners.py
"""
# '*' means that everyone can approve.
EVERYONE = '*'
# Possible status of a file.
# - INSUFFICIENT_REVIEWERS: The path needs owners approval, but none of its
# owners is currently a reviewer of the change.
# - PENDING: An owner of this path has been added as reviewer, but approval
# has not been given yet.
# - APPROVED: The path has been approved by an owner.
APPROVED = 'APPROVED'
PENDING = 'PENDING'
INSUFFICIENT_REVIEWERS = 'INSUFFICIENT_REVIEWERS'
def ListOwners(self, path):
"""List all owners for a file.
The returned list is sorted so that better owners appear first.
"""
raise Exception('Not implemented')
def BatchListOwners(self, paths):
"""List all owners for a group of files.
Returns a dictionary {path: [owners]}.
"""
if not paths:
return dict()
nproc = min(gerrit_util.MAX_CONCURRENT_CONNECTION, len(paths))
with git_common.ScopedPool(nproc, kind='threads') as pool:
return dict(
pool.imap_unordered(lambda p: (p, self.ListOwners(p)), paths))
def GetFilesApprovalStatus(self, paths, approvers, reviewers):
"""Check the approval status for the given paths.
Utility method to check for approval status when a change has not yet
been created, given reviewers and approvers.
See GetChangeApprovalStatus for description of the returned value.
"""
approvers = set(approvers)
if approvers:
approvers.add(self.EVERYONE)
reviewers = set(reviewers)
if reviewers:
reviewers.add(self.EVERYONE)
status = {}
owners_by_path = self.BatchListOwners(paths)
for path, owners in owners_by_path.items():
owners = set(owners)
if owners.intersection(approvers):
status[path] = self.APPROVED
elif owners.intersection(reviewers):
status[path] = self.PENDING
else:
status[path] = self.INSUFFICIENT_REVIEWERS
return status
def ScoreOwners(self, paths, exclude=None):
"""Get sorted list of owners for the given paths."""
if not paths:
return []
exclude = exclude or []
owners = []
queues = self.BatchListOwners(paths).values()
for i in range(max(len(q) for q in queues)):
for q in queues:
if i < len(q) and q[i] not in owners and q[i] not in exclude:
owners.append(q[i])
return owners
def SuggestOwners(self, paths, exclude=None):
"""Suggest a set of owners for the given paths."""
exclude = exclude or []
paths_by_owner = {}
owners_by_path = self.BatchListOwners(paths)
for path, owners in owners_by_path.items():
for owner in owners:
paths_by_owner.setdefault(owner, set()).add(path)
selected = []
missing = set(paths)
for owner in self.ScoreOwners(paths, exclude=exclude):
missing_len = len(missing)
missing.difference_update(paths_by_owner[owner])
if missing_len > len(missing):
selected.append(owner)
if not missing:
break
return selected
def SuggestMinimalOwners(self,
paths: list[str],
exclude: list[str] = None) -> list[str]:
"""
Suggest a set of owners for the given paths. Never return an owner in
the |exclude| list.
Aims to provide only one, but will provide more if it's unable to
find a common owner.
"""
exclude = exclude or []
owners_by_path = self.BatchListOwners(paths)
if not owners_by_path:
return []
common_owners = set(owners_by_path.popitem()[1]) - set(exclude)
for _, owners in owners_by_path.items():
common_owners = common_owners.intersection(set(owners))
if not common_owners:
# This likely means some of the files had `noparent` set.
# Fall back to the default suggestion algorithm, which accounts
# for noparent but is liable to return many different owners
return self.SuggestOwners(paths, exclude)
# Return an arbitrary common owner, preferring those with a good score
sorted_common_owners = [
owner for owner in self.ScoreOwners(paths, exclude=exclude)
if owner in common_owners
]
# Return a singleton list so this function has a consistent return type
return sorted_common_owners[:1]
class GerritClient(OwnersClient):
"""Implement OwnersClient using OWNERS REST API."""
def __init__(self, host, project, branch):
super(GerritClient, self).__init__()
self._host = host
self._project = project
self._branch = branch
self._owners_cache = {}
self._best_owners_cache = {}
# Seed used by Gerrit to shuffle code owners that have the same score.
# Can be used to make the sort order stable across several requests,
# e.g. to get the same set of random code owners for different file
# paths that have the same code owners.
self._seed = random.getrandbits(30)
def _FetchOwners(self, path, cache, highest_score_only=False):
# Always use slashes as separators.
path = path.replace(os.sep, '/')
if path not in cache:
# GetOwnersForFile returns a list of account details sorted by order
# of best reviewer for path. If owners have the same score, the
# order is random, seeded by `self._seed`.
data = gerrit_util.GetOwnersForFile(
self._host,
self._project,
self._branch,
path,
resolve_all_users=False,
highest_score_only=highest_score_only,
seed=self._seed)
cache[path] = [
d['account']['email'] for d in data['code_owners']
if 'account' in d and 'email' in d['account']
]
# If owned_by_all_users is true, add everyone as an owner at the end
# of the owners list.
if data.get('owned_by_all_users', False):
cache[path].append(self.EVERYONE)
return cache[path]
def ListOwners(self, path):
return self._FetchOwners(path, self._owners_cache)
def ListBestOwners(self, path):
return self._FetchOwners(path,
self._best_owners_cache,
highest_score_only=True)
def BatchListBestOwners(self, paths):
"""List only the higest-scoring owners for a group of files.
Returns a dictionary {path: [owners]}.
"""
with git_common.ScopedPool(kind='threads') as pool:
return dict(
pool.imap_unordered(lambda p: (p, self.ListBestOwners(p)),
paths))
def GetCodeOwnersClient(host, project, branch):
"""Get a new OwnersClient.
Uses GerritClient and raises an exception if code-owners plugin is not
available."""
if gerrit_util.IsCodeOwnersEnabledOnHost(host):
return GerritClient(host, project, branch)
raise Exception(
'code-owners plugin is not enabled. Ask your host admin to enable it '
'on %s. Read more about code-owners at '
'https://chromium-review.googlesource.com/'
'plugins/code-owners/Documentation/index.html.' % host)
|