1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
#---------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
#---------------------------------------------------------------------------------------------
try:
import urllib.parse as parse
except ImportError:
import urlparse as parse # pylint: disable=import-error
class HttpChallenge(object):
def __init__(self, request_uri, challenge, response_headers=None):
""" Parses an HTTP WWW-Authentication Bearer challenge from a server. """
self.source_authority = self._validate_request_uri(request_uri)
self.source_uri = request_uri
self._parameters = {}
# get the scheme of the challenge and remove from the challenge string
trimmed_challenge = self._validate_challenge(challenge)
split_challenge = trimmed_challenge.split(' ', 1)
self.scheme = split_challenge[0]
trimmed_challenge = split_challenge[1]
# split trimmed challenge into comma-separated name=value pairs. Values are expected
# to be surrounded by quotes which are stripped here.
for item in trimmed_challenge.split(','):
# process name=value pairs
comps = item.split('=')
if len(comps) == 2:
key = comps[0].strip(' "')
value = comps[1].strip(' "')
if key:
self._parameters[key] = value
# minimum set of parameters
if not self._parameters:
raise ValueError('Invalid challenge parameters')
# must specify authorization or authorization_uri
if 'authorization' not in self._parameters and 'authorization_uri' not in self._parameters:
raise ValueError('Invalid challenge parameters')
# if the response headers were supplied
if response_headers:
# get the message signing key and message key encryption key from the headers
self.server_signature_key = response_headers.get('x-ms-message-signing-key', None)
self.server_encryption_key = response_headers.get('x-ms-message-encryption-key', None)
def is_bearer_challenge(self):
""" Tests whether the HttpChallenge a Bearer challenge.
rtype: bool """
if not self.scheme:
return False
return self.scheme.lower() == 'bearer'
def is_pop_challenge(self):
""" Tests whether the HttpChallenge is a proof of possession challenge.
rtype: bool """
if not self.scheme:
return False
return self.scheme.lower() == 'pop'
def get_value(self, key):
return self._parameters.get(key)
def get_authorization_server(self):
""" Returns the URI for the authorization server if present, otherwise empty string. """
value = ''
for key in ['authorization_uri', 'authorization']:
value = self.get_value(key) or ''
if value:
break
return value
def get_resource(self):
""" Returns the resource if present, otherwise empty string. """
return self.get_value('resource') or ''
def get_scope(self):
""" Returns the scope if present, otherwise empty string. """
return self.get_value('scope') or ''
def supports_pop(self):
""" Returns True if challenge supports pop token auth else False """
return self._parameters.get('supportspop', '').lower() == 'true'
def supports_message_protection(self):
""" Returns True if challenge vault supports message protection """
return self.supports_pop() and self.server_encryption_key and self.server_signature_key
def _validate_challenge(self, challenge):
""" Verifies that the challenge is a valid auth challenge and returns the key=value pairs. """
if not challenge:
raise ValueError('Challenge cannot be empty')
return challenge.strip()
# pylint: disable=no-self-use
def _validate_request_uri(self, uri):
""" Extracts the host authority from the given URI. """
if not uri:
raise ValueError('request_uri cannot be empty')
uri = parse.urlparse(uri)
if not uri.netloc:
raise ValueError('request_uri must be an absolute URI')
if uri.scheme.lower() not in ['http', 'https']:
raise ValueError('request_uri must be HTTP or HTTPS')
return uri.netloc
|