1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
|
from django import http
from django.http import HttpResponseRedirect
from django.views.generic.base import TemplateView
from openid.consumer import consumer
from openid.consumer.discover import DiscoveryFailure
from openid.extensions import ax, pape, sreg
from openid.yadis.constants import YADIS_HEADER_NAME, YADIS_CONTENT_TYPE
from openid.server.trustroot import RP_RETURN_TO_URL_TYPE
from djopenid import util
PAPE_POLICIES = [
'AUTH_PHISHING_RESISTANT',
'AUTH_MULTI_FACTOR',
'AUTH_MULTI_FACTOR_PHYSICAL',
]
# List of (name, uri) for use in generating the request form.
POLICY_PAIRS = [(p, getattr(pape, p))
for p in PAPE_POLICIES]
def getOpenIDStore():
"""
Return an OpenID store object fit for the currently-chosen
database backend, if any.
"""
return util.getOpenIDStore('/tmp/djopenid_c_store', 'c_')
def getConsumer(request):
"""
Get a Consumer object to perform OpenID authentication.
"""
return consumer.Consumer(request.session, getOpenIDStore())
def renderIndexPage(request, **template_args):
template_args['consumer_url'] = util.getViewURL(request, startOpenID)
template_args['pape_policies'] = POLICY_PAIRS
response = TemplateView(
request, 'consumer/index.html', template_args)
response[YADIS_HEADER_NAME] = util.getViewURL(request, rpXRDS)
return response
def startOpenID(request):
"""
Start the OpenID authentication process. Renders an
authentication form and accepts its POST.
* Renders an error message if OpenID cannot be initiated
* Requests some Simple Registration data using the OpenID
library's Simple Registration machinery
* Generates the appropriate trust root and return URL values for
this application (tweak where appropriate)
* Generates the appropriate redirect based on the OpenID protocol
version.
"""
if request.POST:
# Start OpenID authentication.
openid_url = request.POST['openid_identifier']
c = getConsumer(request)
error = None
try:
auth_request = c.begin(openid_url)
except DiscoveryFailure as e:
# Some other protocol-level failure occurred.
error = "OpenID discovery error: %s" % (str(e),)
if error:
# Render the page with an error.
return renderIndexPage(request, error=error)
# Add Simple Registration request information. Some fields
# are optional, some are required. It's possible that the
# server doesn't support sreg or won't return any of the
# fields.
sreg_request = sreg.SRegRequest(optional=['email', 'nickname'],
required=['dob'])
auth_request.addExtension(sreg_request)
# Add Attribute Exchange request information.
ax_request = ax.FetchRequest()
# XXX - uses myOpenID-compatible schema values, which are
# not those listed at axschema.org.
ax_request.add(
ax.AttrInfo('http://schema.openid.net/namePerson',
required=True))
ax_request.add(
ax.AttrInfo('http://schema.openid.net/contact/web/default',
required=False, count=ax.UNLIMITED_VALUES))
auth_request.addExtension(ax_request)
# Add PAPE request information. We'll ask for
# phishing-resistant auth and display any policies we get in
# the response.
requested_policies = []
policy_prefix = 'policy_'
for k, v in request.POST.items():
if k.startswith(policy_prefix):
policy_attr = k[len(policy_prefix):]
if policy_attr in PAPE_POLICIES:
requested_policies.append(getattr(pape, policy_attr))
if requested_policies:
pape_request = pape.Request(requested_policies)
auth_request.addExtension(pape_request)
# Compute the trust root and return URL values to build the
# redirect information.
trust_root = util.getViewURL(request, startOpenID)
return_to = util.getViewURL(request, finishOpenID)
# Send the browser to the server either by sending a redirect
# URL or by generating a POST form.
if auth_request.shouldSendRedirect():
url = auth_request.redirectURL(trust_root, return_to)
return HttpResponseRedirect(url)
else:
# Beware: this renders a template whose content is a form
# and some javascript to submit it upon page load. Non-JS
# users will have to click the form submit button to
# initiate OpenID authentication.
form_id = 'openid_message'
form_html = auth_request.formMarkup(trust_root, return_to,
False, {'id': form_id})
return TemplateView(
request, 'consumer/request_form.html', {'html': form_html})
return renderIndexPage(request)
def finishOpenID(request):
"""
Finish the OpenID authentication process. Invoke the OpenID
library with the response from the OpenID server and render a page
detailing the result.
"""
result = {}
# Because the object containing the query parameters is a
# MultiValueDict and the OpenID library doesn't allow that, we'll
# convert it to a normal dict.
# OpenID 2 can send arguments as either POST body or GET query
# parameters.
request_args = util.normalDict(request.GET)
if request.method == 'POST':
request_args.update(util.normalDict(request.POST))
if request_args:
c = getConsumer(request)
# Get a response object indicating the result of the OpenID
# protocol.
return_to = util.getViewURL(request, finishOpenID)
response = c.complete(request_args, return_to)
# Get a Simple Registration response object if response
# information was included in the OpenID response.
sreg_response = {}
ax_items = {}
if response.status == consumer.SUCCESS:
sreg_response = sreg.SRegResponse.fromSuccessResponse(response)
ax_response = ax.FetchResponse.fromSuccessResponse(response)
if ax_response:
ax_items = {
'fullname': ax_response.get(
'http://schema.openid.net/namePerson'),
'web': ax_response.get(
'http://schema.openid.net/contact/web/default'),
}
# Get a PAPE response object if response information was
# included in the OpenID response.
pape_response = None
if response.status == consumer.SUCCESS:
pape_response = pape.Response.fromSuccessResponse(response)
if not pape_response.auth_policies:
pape_response = None
# Map different consumer status codes to template contexts.
results = {
consumer.CANCEL:
{'message': 'OpenID authentication cancelled.'},
consumer.FAILURE:
{'error': 'OpenID authentication failed.'},
consumer.SUCCESS:
{'url': response.getDisplayIdentifier(),
'sreg': sreg_response and list(sreg_response.items()),
'ax': list(ax_items.items()),
'pape': pape_response}
}
result = results[response.status]
if isinstance(response, consumer.FailureResponse):
# In a real application, this information should be
# written to a log for debugging/tracking OpenID
# authentication failures. In general, the messages are
# not user-friendly, but intended for developers.
result['failure_reason'] = response.message
return renderIndexPage(request, **result)
def rpXRDS(request):
"""
Return a relying party verification XRDS document
"""
return util.renderXRDS(
request,
[RP_RETURN_TO_URL_TYPE],
[util.getViewURL(request, finishOpenID)])
|