1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
|
#!/usr/bin/env python
import sys, stat, urllib, mimetypes, posixpath, time
import tornado.httpserver
import tornado.ioloop
import tornado.web
from bup import options, git, vfs
from bup.helpers import *
handle_ctrl_c()
def _compute_breadcrumbs(path):
"""Returns a list of breadcrumb objects for a path."""
breadcrumbs = []
breadcrumbs.append(('[root]', '/'))
path_parts = path.split('/')[1:-1]
full_path = '/'
for part in path_parts:
full_path += part + '/'
breadcrumbs.append((part, full_path))
return breadcrumbs
def _contains_hidden_files(n):
"""Return True if n contains files starting with a '.', False otherwise."""
for sub in n:
name = sub.name
if len(name)>1 and name.startswith('.'):
return True
return False
def _compute_dir_contents(n, show_hidden=False):
"""Given a vfs node, returns an iterator for display info of all subs."""
for sub in n:
display = link = sub.name
if not show_hidden and len(display)>1 and display.startswith('.'):
continue
# link should be based on fully resolved type to avoid extra
# HTTP redirect.
if stat.S_ISDIR(sub.try_resolve().mode):
link = sub.name + "/"
size = None
if stat.S_ISDIR(sub.mode):
display = sub.name + '/'
elif stat.S_ISLNK(sub.mode):
display = sub.name + '@'
else:
size = sub.size()
yield (display, link, size)
class BupRequestHandler(tornado.web.RequestHandler):
def get(self, path):
return self._process_request(path)
def head(self, path):
return self._process_request(path)
def _process_request(self, path):
path = urllib.unquote(path)
print 'Handling request for %s' % path
try:
n = top.resolve(path)
except vfs.NoSuchFile:
self.send_error(404)
return
f = None
if stat.S_ISDIR(n.mode):
self._list_directory(path, n)
else:
self._get_file(path, n)
def _list_directory(self, path, n):
"""Helper to produce a directory listing.
Return value is either a file object, or None (indicating an
error). In either case, the headers are sent.
"""
if not path.endswith('/') and len(path) > 0:
print 'Redirecting from %s to %s' % (path, path + '/')
return self.redirect(path + '/', permanent=True)
try:
show_hidden = int(self.request.arguments.get('hidden', [0])[-1])
except ValueError, e:
show_hidden = False
self.render(
'list-directory.html',
path=path,
breadcrumbs=_compute_breadcrumbs(path),
files_hidden=_contains_hidden_files(n),
hidden_shown=show_hidden,
dir_contents=_compute_dir_contents(n, show_hidden),
# We need the standard url_escape so we don't escape /
url_escape=urllib.quote)
def _get_file(self, path, n):
"""Process a request on a file.
Return value is either a file object, or None (indicating an error).
In either case, the headers are sent.
"""
ctype = self._guess_type(path)
self.set_header("Last-Modified", self.date_time_string(n.mtime))
self.set_header("Content-Type", ctype)
size = n.size()
self.set_header("Content-Length", str(size))
if self.request.method != 'HEAD':
f = n.open()
for blob in chunkyreader(f):
self.write(blob)
f.close()
def _guess_type(self, path):
"""Guess the type of a file.
Argument is a PATH (a filename).
Return value is a string of the form type/subtype,
usable for a MIME Content-type header.
The default implementation looks the file's extension
up in the table self.extensions_map, using application/octet-stream
as a default; however it would be permissible (if
slow) to look inside the data to make a better guess.
"""
base, ext = posixpath.splitext(path)
if ext in self.extensions_map:
return self.extensions_map[ext]
ext = ext.lower()
if ext in self.extensions_map:
return self.extensions_map[ext]
else:
return self.extensions_map['']
if not mimetypes.inited:
mimetypes.init() # try to read system mime.types
extensions_map = mimetypes.types_map.copy()
extensions_map.update({
'': 'text/plain', # Default
'.py': 'text/plain',
'.c': 'text/plain',
'.h': 'text/plain',
})
def date_time_string(self, t):
return time.strftime('%a, %d %b %Y %H:%M:%S', time.gmtime(t))
optspec = """
bup web [[hostname]:port]
--
"""
o = options.Options('bup web', optspec)
(opt, flags, extra) = o.parse(sys.argv[1:])
if len(extra) > 1:
o.fatal("at most one argument expected")
address = ('127.0.0.1', 8080)
if len(extra) > 0:
addressl = extra[0].split(':', 1)
addressl[1] = int(addressl[1])
address = tuple(addressl)
git.check_repo_or_die()
top = vfs.RefList(None)
settings = dict(
debug = 1,
template_path = resource_path('web'),
)
# Disable buffering on stdout, for debug messages
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
application = tornado.web.Application([
(r"(/.*)", BupRequestHandler),
], **settings)
if __name__ == "__main__":
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(address[1], address=address[0])
print "Serving HTTP on %s:%d..." % http_server._socket.getsockname()
loop = tornado.ioloop.IOLoop.instance()
loop.start()
|