1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
|
import datetime
from calendar import timegm
from jwt import DecodeError, ExpiredSignature, decode as jwt_decode
from openid.consumer.consumer import Consumer, SUCCESS, CANCEL, FAILURE
from openid.consumer.discover import DiscoveryFailure
from openid.extensions import sreg, ax, pape
from social.utils import url_add_parameters
from social.exceptions import AuthException, AuthFailed, AuthCanceled, \
AuthUnknownError, AuthMissingParameter, \
AuthTokenError
from social.backends.base import BaseAuth
from social.backends.oauth import BaseOAuth2
# OpenID configuration
OLD_AX_ATTRS = [
('http://schema.openid.net/contact/email', 'old_email'),
('http://schema.openid.net/namePerson', 'old_fullname'),
('http://schema.openid.net/namePerson/friendly', 'old_nickname')
]
AX_SCHEMA_ATTRS = [
# Request both the full name and first/last components since some
# providers offer one but not the other.
('http://axschema.org/contact/email', 'email'),
('http://axschema.org/namePerson', 'fullname'),
('http://axschema.org/namePerson/first', 'first_name'),
('http://axschema.org/namePerson/last', 'last_name'),
('http://axschema.org/namePerson/friendly', 'nickname'),
]
SREG_ATTR = [
('email', 'email'),
('fullname', 'fullname'),
('nickname', 'nickname')
]
OPENID_ID_FIELD = 'openid_identifier'
SESSION_NAME = 'openid'
class OpenIdAuth(BaseAuth):
"""Generic OpenID authentication backend"""
name = 'openid'
URL = None
USERNAME_KEY = 'username'
def get_user_id(self, details, response):
"""Return user unique id provided by service"""
return response.identity_url
def get_ax_attributes(self):
attrs = self.setting('AX_SCHEMA_ATTRS', [])
if attrs and self.setting('IGNORE_DEFAULT_AX_ATTRS', True):
return attrs
return attrs + AX_SCHEMA_ATTRS + OLD_AX_ATTRS
def get_sreg_attributes(self):
return self.setting('SREG_ATTR') or SREG_ATTR
def values_from_response(self, response, sreg_names=None, ax_names=None):
"""Return values from SimpleRegistration response or
AttributeExchange response if present.
@sreg_names and @ax_names must be a list of name and aliases
for such name. The alias will be used as mapping key.
"""
values = {}
# Use Simple Registration attributes if provided
if sreg_names:
resp = sreg.SRegResponse.fromSuccessResponse(response)
if resp:
values.update((alias, resp.get(name) or '')
for name, alias in sreg_names)
# Use Attribute Exchange attributes if provided
if ax_names:
resp = ax.FetchResponse.fromSuccessResponse(response)
if resp:
for src, alias in ax_names:
name = alias.replace('old_', '')
values[name] = resp.getSingle(src, '') or values.get(name)
return values
def get_user_details(self, response):
"""Return user details from an OpenID request"""
values = {'username': '', 'email': '', 'fullname': '',
'first_name': '', 'last_name': ''}
# update values using SimpleRegistration or AttributeExchange
# values
values.update(self.values_from_response(
response, self.get_sreg_attributes(), self.get_ax_attributes()
))
fullname = values.get('fullname') or ''
first_name = values.get('first_name') or ''
last_name = values.get('last_name') or ''
if not fullname and first_name and last_name:
fullname = first_name + ' ' + last_name
elif fullname:
try:
first_name, last_name = fullname.rsplit(' ', 1)
except ValueError:
last_name = fullname
username_key = self.setting('USERNAME_KEY') or self.USERNAME_KEY
values.update({'fullname': fullname, 'first_name': first_name,
'last_name': last_name,
'username': values.get(username_key) or
(first_name.title() + last_name.title())})
return values
def extra_data(self, user, uid, response, details):
"""Return defined extra data names to store in extra_data field.
Settings will be inspected to get more values names that should be
stored on extra_data field. Setting name is created from current
backend name (all uppercase) plus _SREG_EXTRA_DATA and
_AX_EXTRA_DATA because values can be returned by SimpleRegistration
or AttributeExchange schemas.
Both list must be a value name and an alias mapping similar to
SREG_ATTR, OLD_AX_ATTRS or AX_SCHEMA_ATTRS
"""
sreg_names = self.setting('SREG_EXTRA_DATA')
ax_names = self.setting('AX_EXTRA_DATA')
values = self.values_from_response(response, sreg_names, ax_names)
from_details = super(OpenIdAuth, self).extra_data(
user, uid, {}, details
)
values.update(from_details)
return values
def auth_url(self):
"""Return auth URL returned by service"""
openid_request = self.setup_request(self.auth_extra_arguments())
# Construct completion URL, including page we should redirect to
return_to = self.strategy.absolute_uri(self.redirect_uri)
return openid_request.redirectURL(self.trust_root(), return_to)
def auth_html(self):
"""Return auth HTML returned by service"""
openid_request = self.setup_request(self.auth_extra_arguments())
return_to = self.strategy.absolute_uri(self.redirect_uri)
form_tag = {'id': 'openid_message'}
return openid_request.htmlMarkup(self.trust_root(), return_to,
form_tag_attrs=form_tag)
def trust_root(self):
"""Return trust-root option"""
return self.setting('OPENID_TRUST_ROOT') or \
self.strategy.absolute_uri('/')
def continue_pipeline(self, *args, **kwargs):
"""Continue previous halted pipeline"""
response = self.consumer().complete(dict(self.data.items()),
self.strategy.absolute_uri(
self.redirect_uri
))
kwargs.update({'response': response, 'backend': self})
return self.strategy.authenticate(*args, **kwargs)
def auth_complete(self, *args, **kwargs):
"""Complete auth process"""
response = self.consumer().complete(dict(self.data.items()),
self.strategy.absolute_uri(
self.redirect_uri
))
self.process_error(response)
kwargs.update({'response': response, 'backend': self})
return self.strategy.authenticate(*args, **kwargs)
def process_error(self, data):
if not data:
raise AuthException(self, 'OpenID relying party endpoint')
elif data.status == FAILURE:
raise AuthFailed(self, data.message)
elif data.status == CANCEL:
raise AuthCanceled(self)
elif data.status != SUCCESS:
raise AuthUnknownError(self, data.status)
def setup_request(self, params=None):
"""Setup request"""
request = self.openid_request(params)
# Request some user details. Use attribute exchange if provider
# advertises support.
if request.endpoint.supportsType(ax.AXMessage.ns_uri):
fetch_request = ax.FetchRequest()
# Mark all attributes as required, Google ignores optional ones
for attr, alias in self.get_ax_attributes():
fetch_request.add(ax.AttrInfo(attr, alias=alias,
required=True))
else:
fetch_request = sreg.SRegRequest(
optional=list(dict(self.get_sreg_attributes()).keys())
)
request.addExtension(fetch_request)
# Add PAPE Extension for if configured
preferred_policies = self.setting(
'OPENID_PAPE_PREFERRED_AUTH_POLICIES'
)
preferred_level_types = self.setting(
'OPENID_PAPE_PREFERRED_AUTH_LEVEL_TYPES'
)
max_age = self.setting('OPENID_PAPE_MAX_AUTH_AGE')
if max_age is not None:
try:
max_age = int(max_age)
except (ValueError, TypeError):
max_age = None
if max_age is not None or preferred_policies or preferred_level_types:
pape_request = pape.Request(
max_auth_age=max_age,
preferred_auth_policies=preferred_policies,
preferred_auth_level_types=preferred_level_types
)
request.addExtension(pape_request)
return request
def consumer(self):
"""Create an OpenID Consumer object for the given Django request."""
if not hasattr(self, '_consumer'):
self._consumer = self.create_consumer(self.strategy.openid_store())
return self._consumer
def create_consumer(self, store=None):
return Consumer(self.strategy.openid_session_dict(SESSION_NAME), store)
def uses_redirect(self):
"""Return true if openid request will be handled with redirect or
HTML content will be returned.
"""
return self.openid_request().shouldSendRedirect()
def openid_request(self, params=None):
"""Return openid request"""
try:
return self.consumer().begin(url_add_parameters(self.openid_url(),
params))
except DiscoveryFailure as err:
raise AuthException(self, 'OpenID discovery error: {0}'.format(
err
))
def openid_url(self):
"""Return service provider URL.
This base class is generic accepting a POST parameter that specifies
provider URL."""
if self.URL:
return self.URL
elif OPENID_ID_FIELD in self.data:
return self.data[OPENID_ID_FIELD]
else:
raise AuthMissingParameter(self, OPENID_ID_FIELD)
class OpenIdConnectAssociation(object):
""" Use Association model to save the nonce by force. """
def __init__(self, handle, secret='', issued=0, lifetime=0, assoc_type=''):
self.handle = handle # as nonce
self.secret = secret.encode() # not use
self.issued = issued # not use
self.lifetime = lifetime # not use
self.assoc_type = assoc_type # as state
class OpenIdConnectAuth(BaseOAuth2):
"""
Base class for Open ID Connect backends.
Currently only the code response type is supported.
"""
ID_TOKEN_ISSUER = None
DEFAULT_SCOPE = ['openid']
EXTRA_DATA = ['id_token', 'refresh_token', ('sub', 'id')]
# Set after access_token is retrieved
id_token = None
def auth_params(self, state=None):
"""Return extra arguments needed on auth process."""
params = super(OpenIdConnectAuth, self).auth_params(state)
params['nonce'] = self.get_and_store_nonce(
self.AUTHORIZATION_URL, state
)
return params
def auth_complete_params(self, state=None):
params = super(OpenIdConnectAuth, self).auth_complete_params(state)
# Add a nonce to the request so that to help counter CSRF
params['nonce'] = self.get_and_store_nonce(
self.ACCESS_TOKEN_URL, state
)
return params
def get_and_store_nonce(self, url, state):
# Create a nonce
nonce = self.strategy.random_string(64)
# Store the nonce
association = OpenIdConnectAssociation(nonce, assoc_type=state)
self.strategy.storage.association.store(url, association)
return nonce
def get_nonce(self, nonce):
try:
return self.strategy.storage.association.get(
server_url=self.ACCESS_TOKEN_URL,
handle=nonce
)[0]
except IndexError:
pass
def remove_nonce(self, nonce_id):
self.strategy.storage.association.remove([nonce_id])
def validate_and_return_id_token(self, id_token):
"""
Validates the id_token according to the steps at
http://openid.net/specs/openid-connect-core-1_0.html#IDTokenValidation.
"""
client_id, _client_secret = self.get_key_and_secret()
decryption_key = self.setting('ID_TOKEN_DECRYPTION_KEY')
try:
# Decode the JWT and raise an error if the secret is invalid or
# the response has expired.
id_token = jwt_decode(id_token, decryption_key)
except (DecodeError, ExpiredSignature) as de:
raise AuthTokenError(self, de)
# Verify the issuer of the id_token is correct
if id_token['iss'] != self.ID_TOKEN_ISSUER:
raise AuthTokenError(self, 'Incorrect id_token: iss')
# Verify the token was issued in the last 10 minutes
utc_timestamp = timegm(datetime.datetime.utcnow().utctimetuple())
if id_token['iat'] < (utc_timestamp - 600):
raise AuthTokenError(self, 'Incorrect id_token: iat')
# Verify this client is the correct recipient of the id_token
aud = id_token.get('aud')
if aud != client_id:
raise AuthTokenError(self, 'Incorrect id_token: aud')
# Validate the nonce to ensure the request was not modified
nonce = id_token.get('nonce')
if not nonce:
raise AuthTokenError(self, 'Incorrect id_token: nonce')
nonce_obj = self.get_nonce(nonce)
if nonce_obj:
self.remove_nonce(nonce_obj.id)
else:
raise AuthTokenError(self, 'Incorrect id_token: nonce')
return id_token
def request_access_token(self, *args, **kwargs):
"""
Retrieve the access token. Also, validate the id_token and
store it (temporarily).
"""
response = self.get_json(*args, **kwargs)
self.id_token = self.validate_and_return_id_token(response['id_token'])
return response
|