File: oauth_auth.py

package info (click to toggle)
utopia-documents 2.4.4-2
  • links: PTS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 30,560 kB
  • ctags: 24,084
  • sloc: cpp: 179,735; ansic: 16,208; python: 13,446; xml: 1,937; sh: 1,918; ruby: 1,594; makefile: 527; sql: 6
file content (82 lines) | stat: -rw-r--r-- 3,225 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/env python
# encoding: utf-8
##############################################################################
##############################################################################
##############################################################################
###
### oauth_auth.py
###
### This is for OAUTH authentication.
###
##############################################################################
##############################################################################
##############################################################################

import re

from coda_network import urllib2
from coda_network.oauth import get_oauth_header, AUTH_HEADER_OAUTH

class AbstractOauthAuthHandler:

    # allow for double- and single-quoted realm values
    # (single quotes are a violation of the RFC, but appear in the wild)
    #rx = re.compile('(?:.*,)*[ \t]*([^ \t]+)[ \t]+'
    #                'realm=(["\'])(.*?)\\2', re.I)
    rx = re.compile('([^\"\'=]*)=[\"\']([^\"\'=]*)[\"\'][ \t,]*', re.IGNORECASE)
    def __init__(self, password_mgr=None):
        if password_mgr is None:
            password_mgr = urllib2.HTTPPasswordMgr()
        self.passwd = password_mgr
        self.add_password = self.passwd.add_password
        self.retried = 0

    def http_error_auth_reqed(self, authreq, host, req, headers):
        # host may be an authority (without userinfo) or a URL with an
        # authority
        # XXX could be multiple headers
        authreq = headers.get(authreq, None)

        if self.retried > 2:
            # retry sending the username:password 5 times before failing.
            raise urllib2.HTTPError(req.get_full_url(), 401, "oauth auth failed",
                            headers, None)
        else:
            self.retried += 1

        if authreq:
            auth_items = authreq.split(' ')
            scheme = auth_items[0].strip().lower()
            auth_dict = {}
            for auth in auth_items[1:]:
                mo = AbstractOauthAuthHandler.rx.search(auth)
                if mo:
                    name, val = mo.groups()
                    auth_dict[name] = val
            if scheme.lower() == 'oauth':
                return self.retry_http_oauth_auth(host, req, auth_dict.get('realm', ''))

    def retry_http_oauth_auth(self, host, req, realm):
        auth = self.generate_oauth_header(host, req)
        if auth:
            req.add_unredirected_header(self.auth_header, auth)
            return self.parent.open(req, timeout=req.timeout)
        else:
            return None

    def generate_oauth_header(self, host, req):
        consumer, token = self.passwd.find_user_password(None, host)
        if consumer is not None:
            auth = get_oauth_header(req, consumer, token)
            if req.headers.get(self.auth_header, None) != auth:
                return auth
        return None

class HTTPOauthAuthHandler(AbstractOauthAuthHandler, urllib2.BaseHandler):

    auth_header = AUTH_HEADER_OAUTH

    def http_error_401(self, req, fp, code, msg, headers):
        url = req.get_full_url()
        return self.http_error_auth_reqed('www-authenticate',
                                          url, req, headers)