1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
|
#!/usr/bin/env python3
# note this script needs to run on Python as old as 3.6
import os.path
import argparse
import logging
import datetime
import tempfile
from urllib import request
from urllib.parse import urlparse
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="simple file getter")
parser.add_argument("-o", "--output", help="output file name")
parser.add_argument("-v", "--verbose", action='store_true', help="set verbose output")
parser.add_argument("-H", "--header", action="append", help="set a header")
parser.add_argument("URL", help="download URL")
return parser.parse_args()
def main() -> None:
opts = parse_arguments()
loglevel=logging.ERROR
if opts.verbose:
loglevel=logging.DEBUG
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s", level=loglevel
)
fromurl = urlparse(opts.URL).path
output = os.path.basename(fromurl)
if opts.output:
output = opts.output
if os.path.exists(output):
raise RuntimeError("output path {} already exists".format(output))
total = 0
def _report(sofar: int, tot: int):
nonlocal total
total = tot
logging.debug("got %d/%d kB", sofar / 1024.0, total / 1024.0)
# create a temp in the same directory as output
outdir = os.path.dirname(output)
headers = {}
if opts.header:
for hdr in opts.header:
h, _, v = hdr.partition(": ")
if v == "":
raise RuntimeError("invalid header {}".format(hdr))
headers[h] = v
req = request.Request(opts.URL, headers=headers)
with tempfile.NamedTemporaryFile(mode="wb", dir=outdir, delete=False) as outf:
name = outf.name
try:
now = datetime.datetime.now()
# recreate what urlretrieve() does
with request.urlopen(req) as rsp:
bsize = 8 * 1024
read = 0
size = -1
if "Content-Length" in rsp.headers:
size = int(rsp.headers["Content-Length"])
_report(read, size)
while True:
data = rsp.read(bsize)
if len(data) == 0:
# done
break
read += len(data)
outf.write(data)
_report(read, size)
if size > 0 and read < size:
raise request.ContentTooShortError(
"got {} out of {} bytes".format(read, size),
(),
)
after = datetime.datetime.now()
os.rename(name, output)
except:
os.unlink(name)
raise
speed = float(total) / (after - now).total_seconds() / 1024.0
logging.info("wrote %d bytes to %s, speed %.02f kB/s", total, output, speed)
if __name__ == "__main__":
main()
|