File: github_repository_manager.py

package info (click to toggle)
azure-functions-devops-build 0.0.22-6
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 556 kB
  • sloc: python: 2,422; sh: 12; makefile: 8
file content (107 lines) | stat: -rw-r--r-- 4,116 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import base64
from ..exceptions import (
    GithubContentNotFound,
    GithubIntegrationRequestError,
    GithubUnauthorizedError,
)
from ..base.base_github_manager import BaseGithubManager


class GithubRepositoryManager(BaseGithubManager):

    def check_github_repository(self, repository_fullname):
        header_parameters = self.construct_github_request_header()
        request = self._client.get('/repos/{repo}'.format(repo=repository_fullname))
        response = self._client.send(request, header_parameters)
        if response.status_code // 100 == 2:
            return True
        return False

    def check_github_file(self, repository_fullname, file_path):
        header_parameters = self.construct_github_request_header()
        request = self._client.get('/repos/{repo}/contents/{path}'.format(
            repo=repository_fullname,
            path=file_path
        ))
        response = self._client.send(request, header_parameters)
        return response.status_code // 100 == 2

    def get_content(self, repository_fullname, file_path, get_metadata=True):
        header_parameters = self.construct_github_request_header()

        if get_metadata:  # Get files metadata
            header_parameters['Content-Type'] = 'application/json'
        else:  # Get files content
            header_parameters['Accept'] = 'application/vnd.github.v3.raw'

        request = self._client.get('/repos/{repo}/contents/{path}'.format(
            repo=repository_fullname,
            path=file_path
        ))
        response = self._client.send(request, header_parameters)

        # The response is a Json content
        if response.status_code // 100 == 2:
            return response.json()

        if response.status_code == 401:
            raise GithubUnauthorizedError('Failed to read {repo}/{path}'.format(
                repo=repository_fullname,
                path=file_path
            ))

        if response.status_code == 404:
            raise GithubContentNotFound('Failed to find {repo}/{path}'.format(
                repo=repository_fullname,
                path=file_path
            ))

        raise GithubIntegrationRequestError(response.status_code)

    def put_content(self, repository_fullname, file_path, data):
        header_parameters = self.construct_github_request_header()
        header_parameters['Content-Type'] = 'Application/Json'
        request = self._client.put(
            url='/repos/{repo}/contents/{path}'.format(repo=repository_fullname, path=file_path),
            headers=header_parameters,
            content=data
        )

        response = self._client.send(request)
        if response.status_code // 100 == 2:
            return response

        if response.status_code == 401:
            raise GithubUnauthorizedError('Failed to write {repo}/{path}'.format(
                repo=repository_fullname,
                path=file_path
            ))

        if response.status_code == 404:
            raise GithubContentNotFound('Failed to find {repo}/{path}'.format(
                repo=repository_fullname,
                path=file_path
            ))

        raise GithubIntegrationRequestError("{res.status_code} {res.url}".format(res=response))

    def commit_file(self, repository_fullname, file_path, commit_message, file_data, sha=None, encode='utf-8'):
        data = {
            "branch": "master",
            "message": "{message}".format(message=commit_message),
            "content": base64.b64encode(bytes(file_data.encode(encode))).decode('ascii'),
        }

        if sha:
            data["sha"] = sha

        return self.put_content(
            repository_fullname=repository_fullname,
            file_path=file_path,
            data=data
        )