File: ntlm_auth.py

package info (click to toggle)
utopia-documents 2.4.4-2
  • links: PTS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 30,560 kB
  • ctags: 24,084
  • sloc: cpp: 179,735; ansic: 16,208; python: 13,446; xml: 1,937; sh: 1,918; ruby: 1,594; makefile: 527; sql: 6
file content (98 lines) | stat: -rw-r--r-- 4,278 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python
# encoding: utf-8
##############################################################################
##############################################################################
##############################################################################
###
### ntlm_auth.py
###
### This is for NTLM authentication.
###
##############################################################################
##############################################################################
##############################################################################

import re

from coda_network import urllib2
from coda_network.ntlm import ntlm

NTLM_HEADER = re.compile(r'NTLM(?: (.*))?', re.I)

class AbstractNtlmAuthHandler:
    def __init__(self, password_mgr=None, debuglevel=0):
        if password_mgr is None:
            password_mgr = urllib2.HTTPPasswordMgr()
        self.passwd = password_mgr
        self.add_password = self.passwd.add_password
        self._debuglevel = debuglevel

    def set_http_debuglevel(self, level):
        self._debuglevel = level

    def get_user_password(self, realm, req):
        return self.passwd.find_user_password(realm, req.get_full_url())

    def http_error_authentication_required(self, auth_header_field, req, fp, headers):
        auth_header_value = headers.get(auth_header_field, None)
        if auth_header_field:
            header_match = NTLM_HEADER.search(auth_header_value)
        if header_match:
            fp.close()
            challenge_message = header_match.group(1)
            if not challenge_message:
                # The first stage of authorization
                return self.authenticate_using_http_NTLM(req, auth_header_field, None, headers)
            else:
                # The second stage of authorization
                return self.authorize_using_http_NTLM(req, auth_header_field, None, headers)

    def authenticate_using_http_NTLM(self, req, auth_header_field, realm, headers):
        user, pw = self.get_user_password(realm, req)
        if pw is not None:
            auth = 'NTLM %s' % ntlm.create_NTLM_NEGOTIATE_MESSAGE(user)
            if req.headers.get(self.auth_header, None) == auth:
                return None
            req.add_header(self.auth_header, auth)
            # We need to use the parent to open and keep the connection because NTLM authenticates the connection, not single requests
            req.add_header('Connection', 'Keep-Alive')
            return self.parent.open(req)
        else:
            return None

    def authorize_using_http_NTLM(self, req, auth_header_field, realm, headers):
        user, pw = self.get_user_password(realm, req)
        if pw is not None:
            cookie = headers.get('set-cookie', None)
            if cookie:
                # this is important for some web applications that store authentication-related info in cookies (it took a long time to figure out)
                req.add_header('Cookie', cookie)
            auth_header_value = headers.get(auth_header_field, None)
            (ServerChallenge, NegotiateFlags) = ntlm.parse_NTLM_CHALLENGE_MESSAGE(auth_header_value[5:])
            user_parts = user.split('\\', 1)
            DomainName = user_parts[0].upper()
            UserName = user_parts[1]
            auth = 'NTLM %s' % ntlm.create_NTLM_AUTHENTICATE_MESSAGE(ServerChallenge, UserName, DomainName, pw, NegotiateFlags)
            req.add_header(self.auth_header, auth)
            return self.parent.open(req)
        else:
            return None

class HTTPNtlmAuthHandler(AbstractNtlmAuthHandler, urllib2.BaseHandler):

    auth_header = 'Authorization'

    def http_error_401(self, req, fp, code, msg, headers):
        return self.http_error_authentication_required('www-authenticate', req, fp, headers)


class ProxyNtlmAuthHandler(AbstractNtlmAuthHandler, urllib2.BaseHandler):

    auth_header = 'Proxy-authorization'

    def http_error_407(self, req, fp, code, msg, headers):
        req.proxy_connection_type = headers.get('Proxy-Connection', 'close')
        return self.http_error_authentication_required('proxy-authenticate', req, fp, headers)

    def get_user_password(self, realm, req):
        return self.passwd.find_user_password(realm, req.get_host())