File: github_tools.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (357 lines) | stat: -rw-r--r-- 13,398 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
"""Github tools.
"""

from contextlib import contextmanager
import logging
import os
from pathlib import Path
import shutil
import stat
from subprocess import CalledProcessError
import traceback
from urllib.parse import urlsplit, urlunsplit

from github import Github, GithubException
from git import Repo

from .git_tools import clone_to_path as _git_clone_to_path, checkout_with_fetch

_LOGGER = logging.getLogger(__name__)


class ExceptionContext:  # pylint: disable=too-few-public-methods
    def __init__(self):
        self.comment = None


@contextmanager
def exception_to_github(github_obj_to_comment, summary=""):
    """If any exception comes, log them in the given Github obj."""
    context = ExceptionContext()
    try:
        yield context
    except Exception:  # pylint: disable=broad-except
        if summary:
            summary = ": ({})".format(summary)
        error_type = "an unknown error"
        try:
            raise
        except CalledProcessError as err:
            error_type = "a Subprocess error"
            content = "Command: {}\n".format(err.cmd)
            content += "Finished with return code {}\n".format(err.returncode)
            if err.output:
                content += "and output:\n```shell\n{}\n```".format(err.output)
            else:
                content += "and no output"
        except Exception:  # pylint: disable=broad-except
            content = "```python\n{}\n```".format(traceback.format_exc())
        response = "<details><summary>Encountered {}{}</summary><p>\n\n".format(error_type, summary)
        response += content
        response += "\n\n</p></details>"
        context.comment = create_comment(github_obj_to_comment, response)


def user_from_token(gh_token):
    """Get user login from GitHub token"""
    github_con = Github(gh_token)
    return github_con.get_user()


def create_comment(github_object, body):
    """Create a comment, whatever the object is a PR, a commit or an issue."""
    try:
        return github_object.create_issue_comment(body)  # It's a PR
    except AttributeError:
        return github_object.create_comment(body)  # It's a commit/issue


def get_comments(github_object):
    """Get a list of comments, whater the object is a PR, a commit or an issue."""
    try:
        return github_object.get_issue_comments()  # It's a PR
    except AttributeError:
        return github_object.get_comments()  # It's a commit/issue


def get_files(github_object):
    """Get files from a PR or a commit."""
    try:
        return github_object.get_files()  # Try as a PR object
    except AttributeError:
        return github_object.files  # Try as a commit object


def configure_user(gh_token, repo):
    """git config --global user.email "you@example.com"
    git config --global user.name "Your Name"
    """
    user = user_from_token(gh_token)
    repo.git.config("user.email", user.email or "adxpysdk@microsoft.com")
    repo.git.config("user.name", user.name or "SwaggerToSDK Automation")


def get_full_sdk_id(gh_token, sdk_git_id):
    """If the SDK git id is incomplete, try to complete it with user login"""
    if not "/" in sdk_git_id:
        login = user_from_token(gh_token).login
        return "{}/{}".format(login, sdk_git_id)
    return sdk_git_id


def sync_fork(gh_token, github_repo_id, repo, push=True):
    """Sync the current branch in this fork against the direct parent on Github"""
    if not gh_token:
        _LOGGER.warning("Skipping the upstream repo sync, no token")
        return
    _LOGGER.info("Check if repo has to be sync with upstream")
    github_con = Github(gh_token)
    github_repo = github_con.get_repo(github_repo_id)

    if not github_repo.parent:
        _LOGGER.warning("This repo has no upstream")
        return

    upstream_url = "https://github.com/{}.git".format(github_repo.parent.full_name)
    upstream = repo.create_remote("upstream", url=upstream_url)
    upstream.fetch()
    active_branch_name = repo.active_branch.name
    if not active_branch_name in repo.remotes.upstream.refs:
        _LOGGER.info("Upstream has no branch %s to merge from", active_branch_name)
        return
    else:
        _LOGGER.info("Merge from upstream")
    msg = repo.git.rebase("upstream/{}".format(repo.active_branch.name))
    _LOGGER.debug(msg)
    if push:
        msg = repo.git.push()
        _LOGGER.debug(msg)


def get_or_create_pull(github_repo, title, body, head, base, *, none_if_no_commit=False):
    """Try to create the PR. If the PR exists, try to find it instead. Raises otherwise.

    You should always use the complete head syntax "org:branch", since the syntax is required
    in case of listing.

    if "none_if_no_commit" is set, return None instead of raising exception if the problem
    is that head and base are the same.
    """
    try:  # Try to create or get a PR
        return github_repo.create_pull(title=title, body=body, head=head, base=base)
    except GithubException as err:
        err_message = err.data["errors"][0].get("message", "")
        if err.status == 422 and err_message.startswith("A pull request already exists"):
            _LOGGER.info("PR already exists, get this PR")
            return list(github_repo.get_pulls(head=head, base=base))[0]
        elif none_if_no_commit and err.status == 422 and err_message.startswith("No commits between"):
            _LOGGER.info("No PR possible since head %s and base %s are the same", head, base)
            return None
        else:
            _LOGGER.warning("Unable to create PR:\n%s", err.data)
            raise
    except Exception as err:
        response = traceback.format_exc()
        _LOGGER.warning("Unable to create PR:\n%s", response)
        raise


def clone_to_path(gh_token, folder, sdk_git_id, branch_or_commit=None, *, pr_number=None):
    """Clone the given repo_id to the folder.

    If PR number is specified fetch the magic branches
    pull/<id>/head or pull/<id>/merge from Github. "merge" is tried first, and fallback to "head".
    Beware that pr_number implies detached head, and then no push is possible.

    If branch is specified, checkout this branch or commit finally.

    :param str branch_or_commit: If specified, switch to this branch/commit.
    :param int pr_number: PR number.
    """
    _LOGGER.info("Clone SDK repository %s", sdk_git_id)
    url_parsing = urlsplit(sdk_git_id)
    sdk_git_id = url_parsing.path
    if sdk_git_id.startswith("/"):
        sdk_git_id = sdk_git_id[1:]

    credentials_part = ""
    if gh_token:
        login = user_from_token(gh_token).login
        credentials_part = "{user}:{token}@".format(user=login, token=gh_token)
    else:
        _LOGGER.warning("Will clone the repo without writing credentials")

    https_authenticated_url = "https://{credentials}github.com/{sdk_git_id}.git".format(
        credentials=credentials_part, sdk_git_id=sdk_git_id
    )
    # Clone the repo
    _git_clone_to_path(https_authenticated_url, folder)
    # If this is a PR, do some fetch to improve the number of SHA1 available
    if pr_number:
        try:
            checkout_with_fetch(folder, "pull/{}/merge".format(pr_number))
            return
        except Exception:  # pylint: disable=broad-except
            pass  # Assume "merge" doesn't exist anymore, fetch "head"
        checkout_with_fetch(folder, "pull/{}/head".format(pr_number))
    # If there is SHA1, checkout it. If PR number was given, SHA1 could be inside that PR.
    if branch_or_commit:
        repo = Repo(str(folder))
        repo.git.checkout(branch_or_commit)


def do_pr(
    gh_token, sdk_git_id, sdk_pr_target_repo_id, branch_name, base_branch, pr_body=""
):  # pylint: disable=too-many-arguments
    "Do the PR"
    if not gh_token:
        _LOGGER.info("Skipping the PR, no token found")
        return None
    if not sdk_pr_target_repo_id:
        _LOGGER.info("Skipping the PR, no target repo id")
        return None

    github_con = Github(gh_token)
    sdk_pr_target_repo = github_con.get_repo(sdk_pr_target_repo_id)

    if "/" in sdk_git_id:
        sdk_git_owner = sdk_git_id.split("/")[0]
        _LOGGER.info("Do the PR from %s", sdk_git_owner)
        head_name = "{}:{}".format(sdk_git_owner, branch_name)
    else:
        head_name = branch_name
        sdk_git_repo = github_con.get_repo(sdk_git_id)
        sdk_git_owner = sdk_git_repo.owner.login

    try:
        github_pr = sdk_pr_target_repo.create_pull(
            title="Automatic PR from {}".format(branch_name), body=pr_body, head=head_name, base=base_branch
        )
    except GithubException as err:
        if err.status == 422 and err.data["errors"][0].get("message", "").startswith("A pull request already exists"):
            matching_pulls = sdk_pr_target_repo.get_pulls(base=base_branch, head=sdk_git_owner + ":" + head_name)
            matching_pull = matching_pulls[0]
            _LOGGER.info("PR already exists: %s", matching_pull.html_url)
            return matching_pull
        raise
    _LOGGER.info("Made PR %s", github_pr.html_url)
    return github_pr


def remove_readonly(func, path, _):
    "Clear the readonly bit and reattempt the removal"
    os.chmod(path, stat.S_IWRITE)
    func(path)


@contextmanager
def manage_git_folder(gh_token, temp_dir, git_id, *, pr_number=None):
    """Context manager to avoid readonly problem while cleanup the temp dir.

    If PR number is given, use magic branches "pull" from Github.
    """
    _LOGGER.debug("Git ID %s", git_id)
    if Path(git_id).exists():
        yield git_id
        return  # Do not erase a local folder, just skip here

    # Clone the specific branch
    split_git_id = git_id.split("@")
    branch = split_git_id[1] if len(split_git_id) > 1 else None
    clone_to_path(gh_token, temp_dir, split_git_id[0], branch_or_commit=branch, pr_number=pr_number)
    try:
        yield temp_dir
        # Pre-cleanup for Windows http://bugs.python.org/issue26660
    finally:
        _LOGGER.debug("Preclean Rest folder")
        shutil.rmtree(temp_dir, onerror=remove_readonly)


class GithubLink:
    def __init__(self, gitid, link_type, branch_or_commit, path, token=None):  # pylint: disable=too-many-arguments
        self.gitid = gitid
        self.link_type = link_type
        self.branch_or_commit = branch_or_commit
        self.path = path
        self.token = token

    @classmethod
    def from_string(cls, github_url):
        parsed = urlsplit(github_url)
        netloc = parsed.netloc
        if "@" in netloc:
            token, netloc = netloc.split("@")
        else:
            token = None

        split_path = parsed.path.split("/")
        split_path.pop(0)  # First is always empty
        gitid = split_path.pop(0) + "/" + split_path.pop(0)
        link_type = split_path.pop(0) if netloc != "raw.githubusercontent.com" else "raw"
        branch_or_commit = split_path.pop(0)
        path = "/".join(split_path)
        return cls(gitid, link_type, branch_or_commit, path, token)

    def __repr__(self):
        if self.link_type == "raw":
            netloc = "raw.githubusercontent.com"
            path = "/".join(["", self.gitid, self.branch_or_commit, self.path])
            # If raw and token, needs to be passed with "Authorization: token <token>", so nothing to do here
        else:
            netloc = "github.com" if not self.token else self.token + "@github.com"
            path = "/".join(["", self.gitid, self.link_type, self.branch_or_commit, self.path])
        return urlunsplit(("https", netloc, path, "", ""))

    def as_raw_link(self):
        """Returns a GithubLink to a raw content."""
        if self.link_type == "raw":
            return self  # Can be discussed if we need an hard copy, or fail
        if self.link_type != "blob":
            raise ValueError("Cannot get a download link from a tree link")
        return self.__class__(self.gitid, "raw", self.branch_or_commit, self.path, self.token)


class DashboardCommentableObject:  # pylint: disable=too-few-public-methods
    def __init__(self, issue_or_pr, header):
        self._issue_or_pr = issue_or_pr
        self._header = header

    def create_comment(self, text):
        """Mimic issue API, so we can use it everywhere.
        Return dashboard comment.
        """
        return DashboardComment.get_or_create(self._issue_or_pr, self._header, text)


class DashboardComment:
    def __init__(self, github_comment, header):
        self.github_comment = github_comment
        self._header = header

    @classmethod
    def get_or_create(cls, issue, header, text=None):
        """Get or create the dashboard comment in this issue."""
        for comment in get_comments(issue):
            try:
                if comment.body.splitlines()[0] == header:
                    obj = cls(comment, header)
                    break
            except IndexError:  # The comment body is empty
                pass
        # Hooooooo, no dashboard comment, let's create one
        else:
            comment = create_comment(issue, header)
            obj = cls(comment, header)
        if text:
            obj.edit(text)
        return obj

    def edit(self, text):
        self.github_comment.edit(self._header + "\n" + text)

    @property
    def body(self):
        return self.github_comment.body[len(self._header + "\n") :]

    def delete(self):
        self.github_comment.delete()