File: basic.py

package info (click to toggle)
python-couchdbkit 0.6.5-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 640 kB
  • ctags: 1,257
  • sloc: python: 7,240; makefile: 2
file content (103 lines) | stat: -rw-r--r-- 3,750 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# -*- coding: utf-8 -
#
# This file is part of couchdbkit released under the MIT license. 
# See the NOTICE for more information.

import logging
from paste.request import parse_dict_querystring, parse_formvars
from paste.httpexceptions import HTTPUnauthorized
from paste.httpheaders import CONTENT_LENGTH, CONTENT_TYPE
from repoze.what.middleware import setup_auth
from repoze.who.plugins.auth_tkt import AuthTktCookiePlugin
from repoze.who.interfaces import IChallenger, IIdentifier

import sys
from zope.interface import implements

from .adapters import GroupAdapter, PermissionAdapter, \
Authenticator, MDPlugin


class BasicAuth(object):
    """A basic challenger and identifier"""
    implements(IChallenger, IIdentifier)

    def __init__(self, login_url="/user/login", logout_url="/user/logout"):
        self._login_url = login_url
        self._logout_url = logout_url

    def identify(self, environ):
        path_info = environ['PATH_INFO']
        query = parse_dict_querystring(environ)

        # This will handle the logout request.
        if path_info == self._logout_url:
            # set in environ for self.challenge() to find later
            environ['repoze.who.application'] = HTTPUnauthorized()
            return None
        elif path_info == self._login_url:
            form = parse_formvars(environ)
            form.update(query)
            try:
                credentials = {
                    'login': form['login'],
                    'password': form['password']
                }
            except KeyError:
                credentials = None

            def auth_resp(environ, start_response):
                import json
                resp = {
                    "success": True
                }

                resp_str = json.dumps(resp)

                content_length = CONTENT_LENGTH.tuples(str(len(resp_str)))
                content_type = CONTENT_TYPE.tuples('application/json')
                headers = content_length + content_type
                start_response('200 OK', headers)
                return [resp_str]

            environ['repoze.who.application'] = auth_resp
            return credentials

    def challenge(self, environ, status, app_headers, forget_headers):
        cookies = [(h,v) for (h,v) in app_headers if h.lower() == 'set-cookie']
        if not forget_headers:
            return HTTPUnauthorized()

        def auth_form(environ, start_response):
            towrite = "Challenging this"
            content_length = CONTENT_LENGTH.tuples(str(len(towrite)))
            content_type = CONTENT_TYPE.tuples('text/html')
            headers = content_length + content_type + forget_headers
            start_response('200 OK', headers)
            return [towrite]
        return auth_form

    def remember(self, environ, identity):
        return environ['repoze.who.plugins']['cookie'].remember(environ, identity)

    def forget(self, environ, identity):
        return environ['repoze.who.plugins']['cookie'].forget(environ, identity)

def AuthBasicMiddleware(app, conf, user_class):
    groups = GroupAdapter(user_class)
    groups = {'all_groups': groups}
    permissions = {'all_perms': PermissionAdapter(conf["couchdb.db"])}

    basicauth = BasicAuth()
    cookie = AuthTktCookiePlugin(conf['cookies.secret'])

    who_args = {}
    who_args['authenticators'] = [('accounts', Authenticator(user_class))]
    who_args['challengers'] = [('basicauth', basicauth)]
    who_args['identifiers'] = [('basicauth', basicauth), ('cookie', cookie)]
    who_args['mdproviders'] = [('accounts', MDPlugin(user_class))]
    who_args['log_stream'] = sys.stdout
    who_args['log_level'] = logging.DEBUG

    return setup_auth(app, groups, permissions, **who_args)