1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
|
"""
This module implements an example server for the OpenID library. Some
functionality has been omitted intentionally; this code is intended to
be instructive on the use of this library. This server does not
perform actual user authentication and serves up only one OpenID URL,
with the exception of IDP-generated identifiers.
Some code conventions used here:
* 'request' is a Django request object.
* 'openid_request' is an OpenID library request object.
* 'openid_response' is an OpenID library response
"""
import cgi
from djopenid import util
from djopenid.util import getViewURL
from django import http
from django.template import RequestContext
from django.shortcuts import render_to_response
from openid.server.server import Server, ProtocolError, CheckIDRequest, \
EncodingError
from openid.server.trustroot import verifyReturnTo
from openid.yadis.discover import DiscoveryFailure
from openid.consumer.discover import OPENID_IDP_2_0_TYPE
from openid.extensions import sreg
from openid.extensions import pape
from openid.fetchers import HTTPFetchingError
def getOpenIDStore():
"""
Return an OpenID store object fit for the currently-chosen
database backend, if any.
"""
return util.getOpenIDStore('/tmp/djopenid_s_store', 's_')
def getServer(request):
"""
Get a Server object to perform OpenID authentication.
"""
return Server(getOpenIDStore(), getViewURL(request, endpoint))
def setRequest(request, openid_request):
"""
Store the openid request information in the session.
"""
if openid_request:
request.session['openid_request'] = openid_request
else:
request.session['openid_request'] = None
def getRequest(request):
"""
Get an openid request from the session, if any.
"""
return request.session.get('openid_request')
def server(request):
"""
Respond to requests for the server's primary web page.
"""
return render_to_response(
'server/index.html',
{'user_url': getViewURL(request, idPage),
'server_xrds_url': getViewURL(request, idpXrds),
},
context_instance=RequestContext(request))
def idpXrds(request):
"""
Respond to requests for the IDP's XRDS document, which is used in
IDP-driven identifier selection.
"""
return util.renderXRDS(
request, [OPENID_IDP_2_0_TYPE], [getViewURL(request, endpoint)])
def idPage(request):
"""
Serve the identity page for OpenID URLs.
"""
return render_to_response(
'server/idPage.html',
{'server_url': getViewURL(request, endpoint)},
context_instance=RequestContext(request))
def trustPage(request):
"""
Display the trust page template, which allows the user to decide
whether to approve the OpenID verification.
"""
return render_to_response(
'server/trust.html',
{'trust_handler_url':getViewURL(request, processTrustResult)},
context_instance=RequestContext(request))
def endpoint(request):
"""
Respond to low-level OpenID protocol messages.
"""
s = getServer(request)
query = util.normalDict(request.GET or request.POST)
# First, decode the incoming request into something the OpenID
# library can use.
try:
openid_request = s.decodeRequest(query)
except ProtocolError as why:
# This means the incoming request was invalid.
return render_to_response(
'server/endpoint.html',
{'error': str(why)},
context_instance=RequestContext(request))
# If we did not get a request, display text indicating that this
# is an endpoint.
if openid_request is None:
return render_to_response(
'server/endpoint.html',
{},
context_instance=RequestContext(request))
# We got a request; if the mode is checkid_*, we will handle it by
# getting feedback from the user or by checking the session.
if openid_request.mode in ["checkid_immediate", "checkid_setup"]:
return handleCheckIDRequest(request, openid_request)
else:
# We got some other kind of OpenID request, so we let the
# server handle this.
openid_response = s.handleRequest(openid_request)
return displayResponse(request, openid_response)
def handleCheckIDRequest(request, openid_request):
"""
Handle checkid_* requests. Get input from the user to find out
whether she trusts the RP involved. Possibly, get intput about
what Simple Registration information, if any, to send in the
response.
"""
# If the request was an IDP-driven identifier selection request
# (i.e., the IDP URL was entered at the RP), then return the
# default identity URL for this server. In a full-featured
# provider, there could be interaction with the user to determine
# what URL should be sent.
if not openid_request.idSelect():
id_url = getViewURL(request, idPage)
# Confirm that this server can actually vouch for that
# identifier
if id_url != openid_request.identity:
# Return an error response
error_response = ProtocolError(
openid_request.message,
"This server cannot verify the URL %r" %
(openid_request.identity,))
return displayResponse(request, error_response)
if openid_request.immediate:
# Always respond with 'cancel' to immediate mode requests
# because we don't track information about a logged-in user.
# If we did, then the answer would depend on whether that user
# had trusted the request's trust root and whether the user is
# even logged in.
openid_response = openid_request.answer(False)
return displayResponse(request, openid_response)
else:
# Store the incoming request object in the session so we can
# get to it later.
setRequest(request, openid_request)
return showDecidePage(request, openid_request)
def showDecidePage(request, openid_request):
"""
Render a page to the user so a trust decision can be made.
@type openid_request: openid.server.server.CheckIDRequest
"""
trust_root = openid_request.trust_root
return_to = openid_request.return_to
try:
# Stringify because template's ifequal can only compare to strings.
trust_root_valid = verifyReturnTo(trust_root, return_to) \
and "Valid" or "Invalid"
except DiscoveryFailure as err:
trust_root_valid = "DISCOVERY_FAILED"
except HTTPFetchingError as err:
trust_root_valid = "Unreachable"
pape_request = pape.Request.fromOpenIDRequest(openid_request)
return render_to_response(
'server/trust.html',
{'trust_root': trust_root,
'trust_handler_url':getViewURL(request, processTrustResult),
'trust_root_valid': trust_root_valid,
'pape_request': pape_request,
},
context_instance=RequestContext(request))
def processTrustResult(request):
"""
Handle the result of a trust decision and respond to the RP
accordingly.
"""
# Get the request from the session so we can construct the
# appropriate response.
openid_request = getRequest(request)
# The identifier that this server can vouch for
response_identity = getViewURL(request, idPage)
# If the decision was to allow the verification, respond
# accordingly.
allowed = 'allow' in request.POST
# Generate a response with the appropriate answer.
openid_response = openid_request.answer(allowed,
identity=response_identity)
# Send Simple Registration data in the response, if appropriate.
if allowed:
sreg_data = {
'fullname': 'Example User',
'nickname': 'example',
'dob': '1970-01-01',
'email': 'invalid@example.com',
'gender': 'F',
'postcode': '12345',
'country': 'ES',
'language': 'eu',
'timezone': 'America/New_York',
}
sreg_req = sreg.SRegRequest.fromOpenIDRequest(openid_request)
sreg_resp = sreg.SRegResponse.extractResponse(sreg_req, sreg_data)
openid_response.addExtension(sreg_resp)
pape_response = pape.Response()
pape_response.setAuthLevel(pape.LEVELS_NIST, 0)
openid_response.addExtension(pape_response)
return displayResponse(request, openid_response)
def displayResponse(request, openid_response):
"""
Display an OpenID response. Errors will be displayed directly to
the user; successful responses and other protocol-level messages
will be sent using the proper mechanism (i.e., direct response,
redirection, etc.).
"""
s = getServer(request)
# Encode the response into something that is renderable.
try:
webresponse = s.encodeResponse(openid_response)
except EncodingError as why:
# If it couldn't be encoded, display an error.
text = why.response.encodeToKVForm()
return render_to_response(
'server/endpoint.html',
{'error': cgi.escape(text)},
context_instance=RequestContext(request))
# Construct the appropriate django framework response.
r = http.HttpResponse(webresponse.body)
r.status_code = webresponse.code
for header, value in webresponse.headers.items():
r[header] = value
return r
|