File: simpleget

package info (click to toggle)
snapd 2.72-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 80,412 kB
  • sloc: sh: 16,506; ansic: 16,211; python: 11,213; makefile: 1,919; exp: 190; awk: 58; xml: 22
file content (102 lines) | stat: -rwxr-xr-x 3,005 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/env python3

# note this script needs to run on Python as old as 3.6
import os.path
import argparse
import logging
import datetime
import tempfile
from urllib import request
from urllib.parse import urlparse


def parse_arguments() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="simple file getter")
    parser.add_argument("-o", "--output", help="output file name")
    parser.add_argument("-v", "--verbose", action='store_true', help="set verbose output")
    parser.add_argument("-H", "--header", action="append", help="set a header")
    parser.add_argument("URL", help="download URL")
    return parser.parse_args()


def main() -> None:
    opts = parse_arguments()

    loglevel=logging.ERROR
    if opts.verbose:
        loglevel=logging.DEBUG
    logging.basicConfig(
        format="%(asctime)s - %(levelname)s - %(message)s", level=loglevel
    )

    fromurl = urlparse(opts.URL).path
    output = os.path.basename(fromurl)
    if opts.output:
        output = opts.output

    if os.path.exists(output):
        raise RuntimeError("output path {} already exists".format(output))

    total = 0

    def _report(sofar: int, tot: int):
        nonlocal total
        total = tot
        logging.debug("got %d/%d kB", sofar / 1024.0, total / 1024.0)

    # create a temp in the same directory as output
    outdir = os.path.dirname(output)

    headers = {}
    if opts.header:
        for hdr in opts.header:
            h, _, v = hdr.partition(": ")
            if v == "":
                raise RuntimeError("invalid header {}".format(hdr))
            headers[h] = v

    req = request.Request(opts.URL, headers=headers)
    with tempfile.NamedTemporaryFile(mode="wb", dir=outdir, delete=False) as outf:
        name = outf.name

        try:
            now = datetime.datetime.now()
            # recreate what urlretrieve() does
            with request.urlopen(req) as rsp:
                bsize = 8 * 1024
                read = 0
                size = -1

                if "Content-Length" in rsp.headers:
                    size = int(rsp.headers["Content-Length"])

                _report(read, size)
                while True:
                    data = rsp.read(bsize)
                    if len(data) == 0:
                        # done
                        break

                    read += len(data)
                    outf.write(data)
                    _report(read, size)

                if size > 0 and read < size:
                    raise request.ContentTooShortError(
                        "got {} out of {} bytes".format(read, size),
                        (),
                    )

            after = datetime.datetime.now()

            os.rename(name, output)
        except:
            os.unlink(name)
            raise

    speed = float(total) / (after - now).total_seconds() / 1024.0
    logging.info("wrote %d bytes to %s, speed %.02f kB/s", total, output, speed)


if __name__ == "__main__":
    main()