1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
|
from authlib.common.urls import add_params_to_uri
from authlib.common.urls import is_valid_url
from .base_server import BaseServer
from .errors import AccessDeniedError
from .errors import InvalidClientError
from .errors import InvalidRequestError
from .errors import InvalidTokenError
from .errors import MethodNotAllowedError
from .errors import MissingRequiredParameterError
from .errors import OAuth1Error
class AuthorizationServer(BaseServer):
TOKEN_RESPONSE_HEADER = [
("Content-Type", "application/x-www-form-urlencoded"),
("Cache-Control", "no-store"),
("Pragma", "no-cache"),
]
TEMPORARY_CREDENTIALS_METHOD = "POST"
def _get_client(self, request):
client = self.get_client_by_id(request.client_id)
request.client = client
return client
def create_oauth1_request(self, request):
raise NotImplementedError()
def handle_response(self, status_code, payload, headers):
raise NotImplementedError()
def handle_error_response(self, error):
return self.handle_response(
error.status_code, error.get_body(), error.get_headers()
)
def validate_temporary_credentials_request(self, request):
"""Validate HTTP request for temporary credentials."""
# The client obtains a set of temporary credentials from the server by
# making an authenticated (Section 3) HTTP "POST" request to the
# Temporary Credential Request endpoint (unless the server advertises
# another HTTP request method for the client to use).
if request.method.upper() != self.TEMPORARY_CREDENTIALS_METHOD:
raise MethodNotAllowedError()
# REQUIRED parameter
if not request.client_id:
raise MissingRequiredParameterError("oauth_consumer_key")
# REQUIRED parameter
oauth_callback = request.redirect_uri
if not request.redirect_uri:
raise MissingRequiredParameterError("oauth_callback")
# An absolute URI or
# other means (the parameter value MUST be set to "oob"
if oauth_callback != "oob" and not is_valid_url(oauth_callback):
raise InvalidRequestError('Invalid "oauth_callback" value')
client = self._get_client(request)
if not client:
raise InvalidClientError()
self.validate_timestamp_and_nonce(request)
self.validate_oauth_signature(request)
return request
def create_temporary_credentials_response(self, request=None):
"""Validate temporary credentials token request and create response
for temporary credentials token. Assume the endpoint of temporary
credentials request is ``https://photos.example.net/initiate``:
.. code-block:: http
POST /initiate HTTP/1.1
Host: photos.example.net
Authorization: OAuth realm="Photos",
oauth_consumer_key="dpf43f3p2l4k3l03",
oauth_signature_method="HMAC-SHA1",
oauth_timestamp="137131200",
oauth_nonce="wIjqoS",
oauth_callback="http%3A%2F%2Fprinter.example.com%2Fready",
oauth_signature="74KNZJeDHnMBp0EMJ9ZHt%2FXKycU%3D"
The server validates the request and replies with a set of temporary
credentials in the body of the HTTP response:
.. code-block:: http
HTTP/1.1 200 OK
Content-Type: application/x-www-form-urlencoded
oauth_token=hh5s93j4hdidpola&oauth_token_secret=hdhd0244k9j7ao03&
oauth_callback_confirmed=true
:param request: OAuth1Request instance.
:returns: (status_code, body, headers)
"""
try:
request = self.create_oauth1_request(request)
self.validate_temporary_credentials_request(request)
except OAuth1Error as error:
return self.handle_error_response(error)
credential = self.create_temporary_credential(request)
payload = [
("oauth_token", credential.get_oauth_token()),
("oauth_token_secret", credential.get_oauth_token_secret()),
("oauth_callback_confirmed", True),
]
return self.handle_response(200, payload, self.TOKEN_RESPONSE_HEADER)
def validate_authorization_request(self, request):
"""Validate the request for resource owner authorization."""
if not request.token:
raise MissingRequiredParameterError("oauth_token")
credential = self.get_temporary_credential(request)
if not credential:
raise InvalidTokenError()
# assign credential for later use
request.credential = credential
return request
def create_authorization_response(self, request, grant_user=None):
"""Validate authorization request and create authorization response.
Assume the endpoint for authorization request is
``https://photos.example.net/authorize``, the client redirects Jane's
user-agent to the server's Resource Owner Authorization endpoint to
obtain Jane's approval for accessing her private photos::
https://photos.example.net/authorize?oauth_token=hh5s93j4hdidpola
The server requests Jane to sign in using her username and password
and if successful, asks her to approve granting 'printer.example.com'
access to her private photos. Jane approves the request and her
user-agent is redirected to the callback URI provided by the client
in the previous request (line breaks are for display purposes only)::
http://printer.example.com/ready?
oauth_token=hh5s93j4hdidpola&oauth_verifier=hfdp7dh39dks9884
:param request: OAuth1Request instance.
:param grant_user: if granted, pass the grant user, otherwise None.
:returns: (status_code, body, headers)
"""
request = self.create_oauth1_request(request)
# authorize endpoint should try catch this error
self.validate_authorization_request(request)
temporary_credentials = request.credential
redirect_uri = temporary_credentials.get_redirect_uri()
if not redirect_uri or redirect_uri == "oob":
client_id = temporary_credentials.get_client_id()
client = self.get_client_by_id(client_id)
redirect_uri = client.get_default_redirect_uri()
if grant_user is None:
error = AccessDeniedError()
location = add_params_to_uri(redirect_uri, error.get_body())
return self.handle_response(302, "", [("Location", location)])
request.user = grant_user
verifier = self.create_authorization_verifier(request)
params = [("oauth_token", request.token), ("oauth_verifier", verifier)]
location = add_params_to_uri(redirect_uri, params)
return self.handle_response(302, "", [("Location", location)])
def validate_token_request(self, request):
"""Validate request for issuing token."""
if not request.client_id:
raise MissingRequiredParameterError("oauth_consumer_key")
client = self._get_client(request)
if not client:
raise InvalidClientError()
if not request.token:
raise MissingRequiredParameterError("oauth_token")
token = self.get_temporary_credential(request)
if not token:
raise InvalidTokenError()
verifier = request.oauth_params.get("oauth_verifier")
if not verifier:
raise MissingRequiredParameterError("oauth_verifier")
if not token.check_verifier(verifier):
raise InvalidRequestError('Invalid "oauth_verifier"')
request.credential = token
self.validate_timestamp_and_nonce(request)
self.validate_oauth_signature(request)
return request
def create_token_response(self, request):
"""Validate token request and create token response. Assuming the
endpoint of token request is ``https://photos.example.net/token``,
the callback request informs the client that Jane completed the
authorization process. The client then requests a set of token
credentials using its temporary credentials (over a secure Transport
Layer Security (TLS) channel):
.. code-block:: http
POST /token HTTP/1.1
Host: photos.example.net
Authorization: OAuth realm="Photos",
oauth_consumer_key="dpf43f3p2l4k3l03",
oauth_token="hh5s93j4hdidpola",
oauth_signature_method="HMAC-SHA1",
oauth_timestamp="137131201",
oauth_nonce="walatlh",
oauth_verifier="hfdp7dh39dks9884",
oauth_signature="gKgrFCywp7rO0OXSjdot%2FIHF7IU%3D"
The server validates the request and replies with a set of token
credentials in the body of the HTTP response:
.. code-block:: http
HTTP/1.1 200 OK
Content-Type: application/x-www-form-urlencoded
oauth_token=nnch734d00sl2jdk&oauth_token_secret=pfkkdhi9sl3r4s00
:param request: OAuth1Request instance.
:returns: (status_code, body, headers)
"""
try:
request = self.create_oauth1_request(request)
except OAuth1Error as error:
return self.handle_error_response(error)
try:
self.validate_token_request(request)
except OAuth1Error as error:
self.delete_temporary_credential(request)
return self.handle_error_response(error)
credential = self.create_token_credential(request)
payload = [
("oauth_token", credential.get_oauth_token()),
("oauth_token_secret", credential.get_oauth_token_secret()),
]
self.delete_temporary_credential(request)
return self.handle_response(200, payload, self.TOKEN_RESPONSE_HEADER)
def create_temporary_credential(self, request):
"""Generate and save a temporary credential into database or cache.
A temporary credential is used for exchanging token credential. This
method should be re-implemented::
def create_temporary_credential(self, request):
oauth_token = generate_token(36)
oauth_token_secret = generate_token(48)
temporary_credential = TemporaryCredential(
oauth_token=oauth_token,
oauth_token_secret=oauth_token_secret,
client_id=request.client_id,
redirect_uri=request.redirect_uri,
)
# if the credential has a save method
temporary_credential.save()
return temporary_credential
:param request: OAuth1Request instance
:return: TemporaryCredential instance
"""
raise NotImplementedError()
def get_temporary_credential(self, request):
"""Get the temporary credential from database or cache. A temporary
credential should share the same methods as described in models of
``TemporaryCredentialMixin``::
def get_temporary_credential(self, request):
key = "a-key-prefix:{}".format(request.token)
data = cache.get(key)
# TemporaryCredential shares methods from TemporaryCredentialMixin
return TemporaryCredential(data)
:param request: OAuth1Request instance
:return: TemporaryCredential instance
"""
raise NotImplementedError()
def delete_temporary_credential(self, request):
"""Delete temporary credential from database or cache. For instance,
if temporary credential is saved in cache::
def delete_temporary_credential(self, request):
key = "a-key-prefix:{}".format(request.token)
cache.delete(key)
:param request: OAuth1Request instance
"""
raise NotImplementedError()
def create_authorization_verifier(self, request):
"""Create and bind ``oauth_verifier`` to temporary credential. It
could be re-implemented in this way::
def create_authorization_verifier(self, request):
verifier = generate_token(36)
temporary_credential = request.credential
user_id = request.user.id
temporary_credential.user_id = user_id
temporary_credential.oauth_verifier = verifier
# if the credential has a save method
temporary_credential.save()
# remember to return the verifier
return verifier
:param request: OAuth1Request instance
:return: A string of ``oauth_verifier``
"""
raise NotImplementedError()
def create_token_credential(self, request):
"""Create and save token credential into database. This method would
be re-implemented like this::
def create_token_credential(self, request):
oauth_token = generate_token(36)
oauth_token_secret = generate_token(48)
temporary_credential = request.credential
token_credential = TokenCredential(
oauth_token=oauth_token,
oauth_token_secret=oauth_token_secret,
client_id=temporary_credential.get_client_id(),
user_id=temporary_credential.get_user_id(),
)
# if the credential has a save method
token_credential.save()
return token_credential
:param request: OAuth1Request instance
:return: TokenCredential instance
"""
raise NotImplementedError()
|