File: http_proxy_digest.py

package info (click to toggle)
python-requests-toolbelt 1.0.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 876 kB
  • sloc: python: 3,653; makefile: 166; sh: 7
file content (103 lines) | stat: -rw-r--r-- 3,706 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# -*- coding: utf-8 -*-
"""The module containing HTTPProxyDigestAuth."""
import re

from requests import cookies, utils

from . import _digest_auth_compat as auth


class HTTPProxyDigestAuth(auth.HTTPDigestAuth):
    """HTTP digest authentication between proxy

    :param stale_rejects: The number of rejects indicate that:
        the client may wish to simply retry the request
        with a new encrypted response, without reprompting the user for a
        new username and password. i.e., retry build_digest_header
    :type stale_rejects: int
    """
    _pat = re.compile(r'digest ', flags=re.IGNORECASE)

    def __init__(self, *args, **kwargs):
        super(HTTPProxyDigestAuth, self).__init__(*args, **kwargs)
        self.stale_rejects = 0

        self.init_per_thread_state()

    @property
    def stale_rejects(self):
        thread_local = getattr(self, '_thread_local', None)
        if thread_local is None:
            return self._stale_rejects
        return thread_local.stale_rejects

    @stale_rejects.setter
    def stale_rejects(self, value):
        thread_local = getattr(self, '_thread_local', None)
        if thread_local is None:
            self._stale_rejects = value
        else:
            thread_local.stale_rejects = value

    def init_per_thread_state(self):
        try:
            super(HTTPProxyDigestAuth, self).init_per_thread_state()
        except AttributeError:
            # If we're not on requests 2.8.0+ this method does not exist
            pass

    def handle_407(self, r, **kwargs):
        """Handle HTTP 407 only once, otherwise give up

        :param r: current response
        :returns: responses, along with the new response
        """
        if r.status_code == 407 and self.stale_rejects < 2:
            s_auth = r.headers.get("proxy-authenticate")
            if s_auth is None:
                raise IOError(
                    "proxy server violated RFC 7235:"
                    "407 response MUST contain header proxy-authenticate")
            elif not self._pat.match(s_auth):
                return r

            self.chal = utils.parse_dict_header(
                self._pat.sub('', s_auth, count=1))

            # if we present the user/passwd and still get rejected
            # https://tools.ietf.org/html/rfc2617#section-3.2.1
            if ('Proxy-Authorization' in r.request.headers and
                    'stale' in self.chal):
                if self.chal['stale'].lower() == 'true':  # try again
                    self.stale_rejects += 1
                # wrong user/passwd
                elif self.chal['stale'].lower() == 'false':
                    raise IOError("User or password is invalid")

            # Consume content and release the original connection
            # to allow our new request to reuse the same one.
            r.content
            r.close()
            prep = r.request.copy()
            cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
            prep.prepare_cookies(prep._cookies)

            prep.headers['Proxy-Authorization'] = self.build_digest_header(
                prep.method, prep.url)
            _r = r.connection.send(prep, **kwargs)
            _r.history.append(r)
            _r.request = prep

            return _r
        else:  # give up authenticate
            return r

    def __call__(self, r):
        self.init_per_thread_state()
        # if we have nonce, then just use it, otherwise server will tell us
        if self.last_nonce:
            r.headers['Proxy-Authorization'] = self.build_digest_header(
                r.method, r.url
            )
        r.register_hook('response', self.handle_407)
        return r