File: get_maintainer.py

package info (click to toggle)
optee-os 4.7.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 31,560 kB
  • sloc: ansic: 441,914; asm: 12,903; python: 3,719; makefile: 1,676; sh: 238
file content (319 lines) | stat: -rwxr-xr-x 10,501 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
#!/usr/bin/env python3
#
# Copyright (c) 2019, Linaro Limited
#
# SPDX-License-Identifier: BSD-2-Clause

from pathlib import PurePath
from urllib.request import urlopen

import argparse
import glob
import os
import re
import tempfile


DIFF_GIT_RE = re.compile(r'^diff --git a/(?P<path>.*) ')
REVIEWED_RE = re.compile(r'^Reviewed-by: (?P<approver>.*>)')
ACKED_RE = re.compile(r'^Acked-by: (?P<approver>.*>)')
PATCH_START = re.compile(r'^From [0-9a-f]{40}')


def get_args():
    parser = argparse.ArgumentParser(description='Print the maintainers for '
                                     'the given source files or directories; '
                                     'or for the files modified by a patch or '
                                     'a pull request. '
                                     '(With -m) Check if a patch or pull '
                                     'request is properly Acked/Reviewed for '
                                     'merging.')
    parser.add_argument('-m', '--merge-check', action='store_true',
                        help='use Reviewed-by: and Acked-by: tags found in '
                        'patches to prevent display of information for all '
                        'the approved paths.')
    parser.add_argument('-p', '--show-paths', action='store_true',
                        help='show all paths that are not approved.')
    parser.add_argument('-s', '--strict', action='store_true',
                        help='stricter conditions for patch approval check: '
                        'subsystem "THE REST" is ignored for paths that '
                        'match some other subsystem.')
    parser.add_argument('arg', nargs='*', help='file or patch')
    parser.add_argument('-f', '--file', action='append',
                        help='treat following argument as a file path, not '
                        'a patch.')
    parser.add_argument('-g', '--github-pr', action='append', type=int,
                        help='Github pull request ID. The script will '
                        'download the patchset from Github to a temporary '
                        'file and process it.')
    parser.add_argument('-r', '--release-to', action='store_true',
                        help='show all the recipients to be used in release '
                        'announcement emails (i.e., maintainers, reviewers '
                        'and OP-TEE mailing list(s)) and exit.')
    return parser.parse_args()


def check_cwd():
    cwd = os.getcwd()
    parent = os.path.dirname(os.path.realpath(__file__)) + "/../"
    if (os.path.realpath(cwd) != os.path.realpath(parent)):
        print("Error: this script must be run from the top-level of the "
              "optee_os tree")
        exit(1)


# Parse MAINTAINERS and return a dictionary of subsystems such as:
# {'Subsystem name': {'R': ['foo', 'bar'], 'S': ['Maintained'],
#                     'F': [ 'path1', 'path2' ]}, ...}
def parse_maintainers():
    subsystems = {}
    check_cwd()
    with open("MAINTAINERS", "r") as f:
        start_found = False
        ss = {}
        name = ''
        for line in f:
            line = line.strip()
            if not line:
                continue
            if not start_found:
                if line.startswith("----------"):
                    start_found = True
                continue

            if line[1] == ':':
                letter = line[0]
                if (not ss.get(letter)):
                    ss[letter] = []
                ss[letter].append(line[3:])
            else:
                if name:
                    subsystems[name] = ss
                name = line
                ss = {}
        if name:
            subsystems[name] = ss

    return subsystems


# If @patchset is a patchset files and contains 2 patches or more, write
# individual patches to temporary files and return the paths.
# Otherwise return [].
def split_patchset(patchset):
    psname = os.path.basename(patchset).replace('.', '_')
    patchnum = 0
    of = None
    ret = []
    f = None
    try:
        f = open(patchset, "r")
    except OSError:
        return []
    for line in f:
        match = re.search(PATCH_START, line)
        if match:
            # New patch found: create new file
            patchnum += 1
            prefix = "{}_{}_".format(patchnum, psname)
            of = tempfile.NamedTemporaryFile(mode="w", prefix=prefix,
                                             suffix=".patch",
                                             delete=False)
            ret.append(of.name)
        if of:
            of.write(line)
    if len(ret) >= 2:
        return ret
    if len(ret) == 1:
        os.remove(ret[0])
    return []


# If @path is a patch file, returns the paths touched by the patch as well
# as the content of the review/ack tags
def get_paths_from_patch(patch):
    paths = []
    approvers = []
    try:
        with open(patch, "r") as f:
            for line in f:
                match = re.search(DIFF_GIT_RE, line)
                if match:
                    p = match.group('path')
                    if p not in paths:
                        paths.append(p)
                    continue
                match = re.search(REVIEWED_RE, line)
                if match:
                    a = match.group('approver')
                    if a not in approvers:
                        approvers.append(a)
                    continue
                match = re.search(ACKED_RE, line)
                if match:
                    a = match.group('approver')
                    if a not in approvers:
                        approvers.append(a)
                    continue
    except Exception:
        pass
    return (paths, approvers)


# Does @path match @pattern?
# @pattern has the syntax defined in the Linux MAINTAINERS file -- mostly a
# shell glob pattern, except that a trailing slash means a directory and
# everything below. Matching can easily be done by converting to a regexp.
def match_pattern(path, pattern):
    # Append a trailing slash if path is an existing directory, so that it
    # matches F: entries such as 'foo/bar/'
    if not path.endswith('/') and os.path.isdir(path):
        path += '/'
    rep = "^" + pattern
    rep = rep.replace('*', '[^/]+')
    rep = rep.replace('?', '[^/]')
    if rep.endswith('/'):
        rep += '.*'
    rep += '$'
    return not not re.match(rep, path)


def get_subsystems_for_path(subsystems, path, strict):
    found = {}
    for key in subsystems:
        def inner():
            excluded = subsystems[key].get('X')
            if excluded:
                for pattern in excluded:
                    if match_pattern(path, pattern):
                        return  # next key
            included = subsystems[key].get('F')
            if not included:
                return  # next key
            for pattern in included:
                if match_pattern(path, pattern):
                    found[key] = subsystems[key]
        inner()
    if strict and len(found) > 1:
        found.pop('THE REST', None)
    return found


def get_ss_maintainers(subsys):
    return subsys.get('M') or []


def get_ss_reviewers(subsys):
    return subsys.get('R') or []


def get_ss_approvers(ss):
    return get_ss_maintainers(ss) + get_ss_reviewers(ss)


def get_ss_lists(subsys):
    return subsys.get('L') or []


def approvers_have_approved(approved_by, approvers):
    for n in approvers:
        # Ignore anything after the email (Github ID...)
        n = n.split('>', 1)[0]
        for m in approved_by:
            m = m.split('>', 1)[0]
            if n == m:
                return True
    return False


def download(pr):
    url = "https://github.com/OP-TEE/optee_os/pull/{}.patch".format(pr)
    f = tempfile.NamedTemporaryFile(mode="wb", prefix="pr{}_".format(pr),
                                    suffix=".patch", delete=False)
    print("Downloading {}...".format(url), end='', flush=True)
    f.write(urlopen(url).read())
    print(" Done.")
    return f.name


def show_release_to(subsystems):
    check_cwd()
    with open("MAINTAINERS", "r") as f:
        emails = sorted(set(re.findall(r'[RM]:\t(.*[\w]*<[\w\.-]+@[\w\.-]+>)',
                                       f.read())))
        emails += get_ss_lists(subsystems["THE REST"])
    print(*emails, sep=', ')


def main():
    global args

    args = get_args()

    all_subsystems = parse_maintainers()

    if args.release_to:
        show_release_to(all_subsystems)
        return

    paths = []
    arglist = []
    downloads = []
    split_patches = []

    for pr in args.github_pr or []:
        downloads += [download(pr)]

    for arg in args.arg + downloads:
        if os.path.exists(arg):
            patches = split_patchset(arg)
            if patches:
                split_patches += patches
                continue
        arglist.append(arg)

    for arg in arglist + split_patches:
        patch_paths = []
        approved_by = []
        if os.path.exists(arg):
            # Try to parse as a patch
            (patch_paths, approved_by) = get_paths_from_patch(arg)
        if not patch_paths:
            # Not a patch, consider the path itself
            # as_posix() cleans the path a little bit (suppress leading ./ and
            # duplicate slashes...)
            patch_paths = [PurePath(arg).as_posix()]
        for path in patch_paths:
            approved = False
            if args.merge_check:
                ss_for_path = get_subsystems_for_path(all_subsystems, path,
                                                      args.strict)
                for key in ss_for_path:
                    ss_approvers = get_ss_approvers(ss_for_path[key])
                    if approvers_have_approved(approved_by, ss_approvers):
                        approved = True
            if not approved:
                paths += [path]

    for f in downloads + split_patches:
        os.remove(f)

    if args.file:
        paths += args.file

    if (args.show_paths):
        print(paths)

    ss = {}
    for path in paths:
        ss.update(get_subsystems_for_path(all_subsystems, path, args.strict))
    for key in ss:
        ss_name = key[:50] + (key[50:] and '...')
        for name in ss[key].get('M') or []:
            print("{} (maintainer:{})".format(name, ss_name))
        for name in ss[key].get('R') or []:
            print("{} (reviewer:{})".format(name, ss_name))


if __name__ == "__main__":
    main()