File: test_ntlmauth.py

package info (click to toggle)
python-ntlm 1.1.0-1
  • links: PTS
  • area: main
  • in suites: buster, stretch
  • size: 240 kB
  • ctags: 323
  • sloc: python: 2,664; makefile: 9
file content (211 lines) | stat: -rw-r--r-- 7,924 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
"""\
Demonstrate various defects (or their repair!) in the ntml module.
"""


from StringIO import StringIO
import httplib
import inspect, os, sys
import traceback
import urllib2
try:
    from ntlm import HTTPNtlmAuthHandler
except ImportError:
    # assume ntlm is in the directory "next door"
    ntlm_folder = os.path.realpath(os.path.join(
        os.path.dirname(inspect.getfile(inspect.currentframe())),
        '..'))
    sys.path.insert(0, ntlm_folder)
    from ntlm import HTTPNtlmAuthHandler


# The headers seen during an initial NTML rejection.
initial_rejection = '''HTTP/1.1 401 Unauthorized
Server: Apache-Coyote/1.1
WWW-Authenticate: NTLM
Connection: close
Date: Tue, 03 Feb 2009 11:47:33 GMT
Connection: close

'''


# The headers and data seen following a successful NTML connection.
eventual_success = '''HTTP/1.1 200 OK
Server: Apache-Coyote/1.1
WWW-Authenticate: NTLM TlRMTVNTUAACAAAABAAEADgAAAAFgomi3k7KRx+HGYQAAAAAAAAAALQAtAA8AAAABgGwHQAAAA9OAEEAAgAEAE4AQQABABYATgBBAFMAQQBOAEUAWABIAEMAMAA0AAQAHgBuAGEALgBxAHUAYQBsAGMAbwBtAG0ALgBjAG8AbQADADYAbgBhAHMAYQBuAGUAeABoAGMAMAA0AC4AbgBhAC4AcQB1AGEAbABjAG8AbQBtAC4AYwBvAG0ABQAiAGMAbwByAHAALgBxAHUAYQBsAGMAbwBtAG0ALgBjAG8AbQAHAAgADXHouNLjzAEAAAAA
Date: Tue, 03 Feb 2009 11:47:33 GMT
Connection: close

Hello, world!'''


# A collection of transactions representing various defects in NTLM
# processing. Each is indexed according the the issues number recorded
# for the defect at code.google.com, and consists of a series of server
# responses that should be seen as a connection is attempted.
issues = {
    27: [
        initial_rejection,
        '''HTTP/1.1 401 Unauthorized
Server: Apache-Coyote/1.1
WWW-Authenticate: NTLM TlRMTVNTUAACAAAABAAEADgAAAAFgomi3k7KRx+HGYQAAAAAAAAAALQAtAA8AAAABgGwHQAAAA9OAEEAAgAEAE4AQQABABYATgBBAFMAQQBOAEUAWABIAEMAMAA0AAQAHgBuAGEALgBxAHUAYQBsAGMAbwBtAG0ALgBjAG8AbQADADYAbgBhAHMAYQBuAGUAeABoAGMAMAA0AC4AbgBhAC4AcQB1AGEAbABjAG8AbQBtAC4AYwBvAG0ABQAiAGMAbwByAHAALgBxAHUAYQBsAGMAbwBtAG0ALgBjAG8AbQAHAAgADXHouNLjzAEAAAAA
WWW-Authenticate: Negotiate
Content-Length: 0
Date: Tue, 03 Feb 2009 11:47:33 GMT
Connection: close

''',
        eventual_success,
        ],
    28: [
        initial_rejection,
        '''HTTP/1.1 401 Unauthorized
Server: Apache-Coyote/1.1
WWW-Authenticate: NTLM TlRMTVNTUAACAAAAAAAAAAAAAAABAgAAO/AU3OJc3g0=
Content-Length: 0
Date: Tue, 03 Feb 2009 11:47:33 GMT
Connection: close

''',
        eventual_success,
        ],
    }


class FakeSocket(StringIO):
    '''Extends StringIO just enough to look like a socket.'''
    def makefile(self, *args, **kwds):
        '''The instance already looks like a file.'''
        return self
    def sendall(self, *args, **kwds):
        '''Ignore any data that may be sent.'''
        pass
    def close(self):
        '''Ignore any calls to close.'''
        pass


class FakeHTTPConnection(httplib.HTTPConnection):
    '''Looks like a normal HTTPConnection, but returns a FakeSocket.
    The connection's port number is used to choose a set of transactions
    to replay to the user.  A class static variable is used to track
    how many transactions have been replayed.'''
    attempt = {}
    def connect(self):
        '''Returns a FakeSocket containing the data for a single
        transaction.'''
        nbr = self.attempt.setdefault(self.port, 0)
        self.attempt[self.port] = nbr + 1
        print 'connecting to %s:%s (attempt %s)' % (self.host, self.port, nbr)
        self.sock = FakeSocket(issues[self.port][nbr])


class FakeHTTPHandler(urllib2.HTTPHandler):
    connection = FakeHTTPConnection
    def http_open(self, req):
        print 'opening', self.connection
        return self.do_open(self.connection, req)


def process(*issue_nbrs):
    '''Run the specified tests, or all of them.'''

    if issue_nbrs:
        # Make sure the tests are ints.
        issue_nbrs = map(int, issue_nbrs)
    else:
        # If no tests were specified, run them all.
        issue_nbrs = issues.keys()

    assert all(i in issues for i in issue_nbrs)

    user = 'DOMAIN\User'
    password = "Password"
    url = "http://www.example.org:%s/"

    # Set passwords for each of the "servers" to which we will be connecting.
    # Each distinct port on a server requires it's own set of credentials.
    passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
    for k in issue_nbrs:
        passman.add_password(None, url % k, user, password)

    # Create the NTLM authentication handler.
    auth_NTLM = HTTPNtlmAuthHandler.HTTPNtlmAuthHandler(passman, debuglevel=0)

    # Create and install openers for both the NTLM Auth handler and
    # our fake HTTP handler.
    opener = urllib2.build_opener(auth_NTLM, FakeHTTPHandler)
    urllib2.install_opener(opener)

    # The following is a massive kludge; let me explain why it is needed.
    HTTPNtlmAuthHandler.httplib.HTTPConnection = FakeHTTPConnection
    # At the heart of the urllib2 module is the opener director. Whenever a
    # URL is opened, the director is responsible for locating the proper
    # handler for the protocol specified in the URL. Frequently, an existing
    # protocol handler will be subclassed and then added to the collection
    # maintained by the director. When urlopen is called, the specified
    # request is immediately handed off to the director's "open" method
    # which finds the correct handler and invokes the protocol-specific
    # XXX_open method. At least in the case of the HTTP protocols, if an
    # error occurs then the director is called again to find and invoke a
    # handler for the error; these handlers generally open a new connection
    # after adding headers to avoid the error going forward. Finally, it is
    # important to note that at the present time, the HTTP handlers in
    # urllib2 are built using a class that isn't prepared to deal with a
    # persistent connection, so they always add a "Connection: close" header
    # to the request.
    # 
    # Unfortunately, NTLM only certifies the current connection, meaning
    # that  a "Connection: keep-alive" header must be used to keep it open
    # throughout the authentication process. Furthermore, because the opener
    # director only provides a do_open method, there is no way to discover
    # the type of connection without also opening it. This means that the
    # HTTPNtlmAuthHandler cannot use the normal HTTPHandler and must
    # therefore must hardcode the HTTPConnection class. If a custom class is
    # required for whatever reason, the only way to cause it to be used is
    # to monkey-patch the code, as is done in the line above.

    for i in sorted(issue_nbrs):
        print '\nissue %d' % i
        try:
            f = urllib2.urlopen(url % i)
        except:
            traceback.print_exc()
        else:
            print f.read()


# The following is adapted from Guido van van Rossum's suggestion.
# http://www.artima.com/weblogs/viewpost.jsp?thread=4829

import sys
import getopt

class Usage(Exception):
    def __init__(self, msg):
        self.msg = msg

def main(argv=None):
    """Usage:  %s"""
    if argv is None:
        argv = sys.argv
    try:
        try:
            opts, args = getopt.getopt(argv[1:], "h", ["help"])
        except getopt.error, msg:
             raise Usage(msg)
        if opts:
            raise Usage(main.func_doc)
        if len(args) > 0:
            raise Usage('takes no arguments (%d given)' % len(args))
        process(*args)
    except Usage, err:
        print >>sys.stderr, err.msg
        if err.msg is not main.func_doc:
            print >>sys.stderr, "for help use --help"
        return 2
main.func_doc %= os.path.basename(sys.argv[0])

if __name__ == "__main__":
    sys.exit(main())