File: http_bearer_challenge.py

package info (click to toggle)
python-azure 20181112%2Bgit-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 407,300 kB
  • sloc: python: 717,190; makefile: 201; sh: 76
file content (98 lines) | stat: -rw-r--r-- 3,752 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#---------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
#---------------------------------------------------------------------------------------------

try:
    import urllib.parse as parse
except ImportError:
    import urlparse as parse # pylint: disable=import-error


class HttpBearerChallenge(object):

    def __init__(self, request_uri, challenge):
        """ Parses an HTTP WWW-Authentication Bearer challenge from a server. """
        self.source_authority = self._validate_request_uri(request_uri)
        self.source_uri = request_uri
        self._parameters = {}

        trimmed_challenge = self._validate_challenge(challenge)

        # split trimmed challenge into comma-separated name=value pairs. Values are expected
        # to be surrounded by quotes which are stripped here.
        for item in trimmed_challenge.split(','):
            # process name=value pairs
            comps = item.split('=')
            if len(comps) == 2:
                key = comps[0].strip(' "')
                value = comps[1].strip(' "')
                if key:
                    self._parameters[key] = value

        # minimum set of parameters
        if not self._parameters:
            raise ValueError('Invalid challenge parameters')

        # must specify authorization or authorization_uri
        if 'authorization' not in self._parameters and 'authorization_uri' not in self._parameters:
            raise ValueError('Invalid challenge parameters')

    # pylint: disable=no-self-use
    @staticmethod
    def is_bearer_challenge(authentication_header):
        """ Tests whether an authentication header is a Bearer challenge.
        :param authentication_header: the authentication header to test
        rtype: bool """
        if not authentication_header:
            return False

        return authentication_header.strip().startswith('Bearer ')

    def get_value(self, key):
        return self._parameters.get(key)

    def get_authorization_server(self):
        """ Returns the URI for the authorization server if present, otherwise empty string. """
        value = ''
        for key in ['authorization_uri', 'authorization']:
            value = self.get_value(key) or ''
            if value:
                break
        return value

    def get_resource(self):
        """ Returns the resource if present, otherwise empty string. """
        return self.get_value('resource') or ''

    def get_scope(self):
        """ Returns the scope if present, otherwise empty string. """
        return self.get_value('scope') or ''

    # pylint: disable=no-self-use
    def _validate_challenge(self, challenge):
        """ Verifies that the challenge is a Bearer challenge and returns the key=value pairs. """
        bearer_string = 'Bearer '
        if not challenge:
            raise ValueError('Challenge cannot be empty')

        challenge = challenge.strip()
        if not challenge.startswith(bearer_string):
            raise ValueError('Challenge is not Bearer')

        return challenge[len(bearer_string):]

    # pylint: disable=no-self-use
    def _validate_request_uri(self, uri):
        """ Extracts the host authority from the given URI. """
        if not uri:
            raise ValueError('request_uri cannot be empty')

        uri = parse.urlparse(uri)
        if not uri.netloc:
            raise ValueError('request_uri must be an absolute URI')

        if uri.scheme.lower() not in ['http', 'https']:
            raise ValueError('request_uri must be HTTP or HTTPS')

        return uri.netloc