File: appveyor-download.py

package info (click to toggle)
python-gevent 24.11.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 20,364 kB
  • sloc: python: 138,768; ansic: 87,807; sh: 12,548; makefile: 2,379; javascript: 108
file content (136 lines) | stat: -rw-r--r-- 4,494 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python
"""
Use the AppVeyor API to download Windows artifacts.

Taken from: https://bitbucket.org/ned/coveragepy/src/tip/ci/download_appveyor.py
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt
"""
import argparse
import os
import zipfile

import requests

# To delete:
# DELETE https://ci.appveyor.com/api/projects/{accountName}/{projectSlug}/buildcache
# requests.delete(make_url('/projects/denik/gevent/buildcache'), headers=make_auth_headers)

def make_auth_headers(fname=".appveyor.token"):
    """Make the authentication headers needed to use the Appveyor API."""
    if not os.path.exists(fname):
        fname = os.path.expanduser("~/bin/appveyor-token")
    if not os.path.exists(fname):
        raise RuntimeError(
            "Please create a file named `.appveyor.token` in the current directory. "
            "You can get the token from https://ci.appveyor.com/api-token"
        )
    with open(fname) as f:
        token = f.read().strip()

    headers = {
        'Authorization': 'Bearer {}'.format(token),
    }
    return headers


def make_url(url, **kwargs):
    """Build an Appveyor API url."""
    return "https://ci.appveyor.com/api" + url.format(**kwargs)


def get_project_build(account_project, build_num):
    """Get the details of the latest Appveyor build."""
    url = '/projects/{account_project}'
    url_args = {'account_project': account_project}
    if build_num:
        url += '/build/{buildVersion}'
        url_args['buildVersion'] = build_num
    url = make_url(url, **url_args)
    response = requests.get(url, headers=make_auth_headers())
    return response.json()


def download_latest_artifacts(account_project, build_num):
    """Download all the artifacts from the latest build."""
    build = get_project_build(account_project, build_num)
    jobs = build['build']['jobs']
    print("Build {0[build][version]}, {1} jobs: {0[build][message]}".format(build, len(jobs)))
    for job in jobs:
        name = job['name'].partition(':')[2].split(',')[0].strip()
        print("  {0}: {1[status]}, {1[artifactsCount]} artifacts".format(name, job))

        url = make_url("/buildjobs/{jobid}/artifacts", jobid=job['jobId'])
        response = requests.get(url, headers=make_auth_headers())
        artifacts = response.json()

        for artifact in artifacts:
            is_zip = artifact['type'] == "Zip"
            filename = artifact['fileName']
            print("    {0}, {1} bytes".format(filename, artifact['size']))

            url = make_url(
                "/buildjobs/{jobid}/artifacts/{filename}",
                jobid=job['jobId'],
                filename=filename
            )
            download_url(url, filename, make_auth_headers())

            if is_zip:
                unpack_zipfile(filename)
                os.remove(filename)


def ensure_dirs(filename):
    """Make sure the directories exist for `filename`."""
    dirname, _ = os.path.split(filename)
    if dirname and not os.path.exists(dirname):
        os.makedirs(dirname)


def download_url(url, filename, headers):
    """Download a file from `url` to `filename`."""
    ensure_dirs(filename)
    response = requests.get(url, headers=headers, stream=True)
    if response.status_code == 200:
        with open(filename, 'wb') as f:
            for chunk in response.iter_content(16 * 1024):
                f.write(chunk)


def unpack_zipfile(filename):
    """Unpack a zipfile, using the names in the zip."""
    with open(filename, 'rb') as fzip:
        z = zipfile.ZipFile(fzip)
        for name in z.namelist():
            print("      extracting {}".format(name))
            ensure_dirs(name)
            z.extract(name)

def main(argv=None):
    import sys
    argv = argv or sys.argv[1:]

    parser = argparse.ArgumentParser(description='Download artifacts from AppVeyor.')
    parser.add_argument(
        'name',
        metavar='ID',
        help='Project ID in AppVeyor. Example: ionelmc/python-nameless'
    )
    parser.add_argument(
        'build',
        default=None,
        nargs='?',
        help=(
            'The project build version. If not given, discovers the latest. '
            'Note that this is not the build number. '
            'Example: 1.0.2420'
        )
    )

    args = parser.parse_args(argv)
    download_latest_artifacts(args.name, args.build)


if __name__ == "__main__":
    main()