1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
|
#!/usr/bin/env python
"""
Implements a clone of the built in http.server functionality but adds the
ability to download multiple files and directories as zip files.
Run `zipserver --help` or `python -m zipstream.server --help` for details
WARNING: zipstream.server is not recommended for production. It only implements
basic security checks.
"""
import contextlib
import functools
import html
from http import HTTPStatus
import http.server
import io
import os
import sys
import urllib
from zipstream.ng import ZipStream
class ZippingHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
def list_directory(self, path):
"""
A clone of `http.server.SimpleHTTPRequestHandler.list_directory` method
with slight modifications to add checkboxes beside each entry and a
download button at the bottom that submits the checked files as a POST
request.
"""
# Additions to the original `list_directory` are marked with `ADDED`
# comments.
try:
filelist = os.listdir(path)
except OSError:
self.send_error(HTTPStatus.NOT_FOUND, "No permission to list directory")
return None
try:
displaypath = urllib.parse.unquote(self.path, errors='surrogatepass')
except UnicodeDecodeError:
displaypath = urllib.parse.unquote(path)
displaypath = html.escape(displaypath, quote=False)
title = 'Directory listing for %s' % displaypath
enc = sys.getfilesystemencoding()
r = []
r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">')
r.append('<html>')
r.append('<head>')
r.append('<meta http-equiv="Content-Type" content="text/html; charset=%s">' % enc)
r.append('<title>%s</title>' % title)
r.append('</head>')
r.append('<body>')
r.append('<h1>%s</h1>' % title)
r.append('<hr>')
r.append('<form method="post">') # ADDED
r.append('<ul>')
for name in sorted(filelist, key=lambda x: x.lower()):
fullname = os.path.join(path, name)
displayname = linkname = name
# Append / for directories or @ for symbolic links
if os.path.isdir(fullname):
displayname = name + "/"
linkname = name + "/"
if os.path.islink(fullname):
displayname = name + "@"
# Note: a link to a directory displays with @ and links with /
linkname = urllib.parse.quote(linkname, errors='surrogatepass')
displayname = html.escape(displayname, quote=False)
r.append(
'<li>'
'<input type="checkbox" name="files" value="{0}"/> ' # ADDED
'<a href="{0}">{1}</a></li>'
''.format(linkname, displayname)
)
r.append('</ul>')
r.append('<hr>')
r.append('<button>Download zip of checked files</button>') # ADDED
r.append('</form>') # ADDED
r.append('</body>')
r.append('</html>')
encoded = '\n'.join(r).encode(enc, 'surrogateescape')
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "text/html; charset=%s" % enc)
self.send_header("Content-Length", str(len(encoded)))
self.end_headers()
return f
def do_POST(self):
"""Return a zip of all the files specified in the POST data as a
stream"""
# Get the content length so the POST data can be read
try:
content_length = int(self.headers.get('Content-Length'))
if not content_length:
raise ValueError()
except (KeyError, ValueError, TypeError):
self.send_error(HTTPStatus.BAD_REQUEST, "Invalid content length")
return
# Read and decode the POST data
enc = sys.getfilesystemencoding()
try:
post_data = self.rfile.read(content_length).decode(enc)
except UnicodeDecodeError:
self.send_error(HTTPStatus.BAD_REQUEST, "Invalid encoding of POST data")
return
# Parse the filename(s) to add to the zip out of the POST data
try:
data = urllib.parse.parse_qs(post_data, strict_parsing=True)
except ValueError:
self.send_error(HTTPStatus.BAD_REQUEST, "No files selected")
return
# Generate the ZipStream from the POSTed filenames and send it as the
# response body.
# Note that since the ZipStream is sized, the total size of it can be
# calulated before starting to stream it. This is used to set the
# "Content-Length" header, giving the client the ability to show a
# download progress bar, estimate the time remaining, etc.
zs = ZipStream(sized=True)
zs.comment = "Generated by https://github.com/pR0Ps/zipstream-ng"
for x in data.get("files") or []:
with contextlib.suppress(OSError, ValueError):
zs.add_path(self.translate_path(os.path.join(self.path, x)))
# Don't send back an empty zip
if not zs:
self.send_error(HTTPStatus.BAD_REQUEST, "No files to zip up")
return
# Send response headers
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "application/zip")
self.send_header("Content-Disposition", "attachment; filename=files.zip")
self.send_header("Content-Length", len(zs))
self.send_header("Last-Modified", zs.last_modified)
self.end_headers()
# Generate the data of the ZipStream as it's sent to the client
self.wfile.writelines(zs)
def main():
import argparse
parser = argparse.ArgumentParser(description=(
"Simple fileserver with support for downloading multiple files and "
"folders as a single zip file."
))
parser.add_argument(
"--bind", "-b",
metavar="ADDRESS",
help="Specify alternate bind address [default: all interfaces]"
)
parser.add_argument(
"--directory", "-d",
default=os.getcwd(),
help="Specify alternative directory [default:current directory]"
)
parser.add_argument(
"port",
action="store",
default=8000,
type=int,
nargs="?",
help="Specify alternate port [default: 8000]"
)
args = parser.parse_args()
http.server.test(
HandlerClass=functools.partial(
ZippingHTTPRequestHandler,
directory=args.directory
),
ServerClass=http.server.ThreadingHTTPServer,
port=args.port,
bind=args.bind
)
if __name__ == "__main__":
main()
|