1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
# -*- coding: utf-8 -*-
import binascii
from trac.core import Component, implements
from trac.config import ListOption, Option
from trac.web.api import IRequestFilter, RequestDone, IAuthenticator
from acct_mgr.api import AccountManager
__all__ = ['HTTPAuthFilter']
def _escape_quotes(text):
return text.replace('\\', r'\\').replace('"', r'\"')
class HTTPAuthFilter(Component):
"""Request filter and handler to provide HTTP authentication."""
realm = Option('httpauth', 'realm', default='Control Panel',
doc='HTTP authentication realm')
paths = ListOption('httpauth', 'paths',
default='/login/xmlrpc,/login/jsonrpc',
doc='Paths to force HTTP authentication on.')
formats = ListOption('httpauth', 'formats',
doc='Request formats to force HTTP authentication on')
implements(IRequestFilter, IAuthenticator)
# IRequestFilter methods
def pre_process_request(self, req, handler):
check = req.path_info.startswith(tuple(self.paths)) or \
req.args.get('format') in self.formats
if check and not self._check_password(req):
self.log.info(
'HTTPAuthFilter: No/bad authentication data given, returing 403')
return self
return handler
def post_process_request(self, req, template, data, content_type):
return template, data, content_type
# IRequestHandler methods (sort of)
def process_request(self, req):
if req.session:
req.session.save() # Just in case
auth_req_msg = b'Authentication required'
req.send_response(401)
req.send_header('WWW-Authenticate',
'Basic realm="%s"' % _escape_quotes(self.realm))
req.send_header('Content-Type', 'text/plain')
req.send_header('Pragma', 'no-cache')
req.send_header('Cache-control', 'no-cache')
req.send_header('Expires', 'Fri, 01 Jan 1999 00:00:00 GMT')
req.send_header('Content-Length', str(len(auth_req_msg)))
if req.get_header('Content-Length'):
req.send_header('Connection', 'close')
req.end_headers()
if req.method != 'HEAD':
req.write(auth_req_msg)
raise RequestDone
# IAuthenticator methods
def authenticate(self, req):
user = self._check_password(req)
if user:
req.environ['REMOTE_USER'] = user
self.log.debug('HTTPAuthFilter: Authentication okay for %s', user)
return user
# Internal methods
def _check_password(self, req):
header = req.get_header('Authorization')
if not header:
return None
values = header.split()
if values[0].lower() != 'basic':
return None
if len(values) != 2:
return None
try:
creds = binascii.a2b_base64(values[1])
except binascii.Error:
return None
creds = creds.decode('latin1')
if ':' not in creds:
return None
user, passwd = creds.split(':', 1)
if AccountManager(self.env).check_password(user, passwd):
return user
|