1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
import logging
from django.conf import settings
from django.core.cache import cache
from django.http import HttpResponse
from authlib.common.security import generate_token
from authlib.common.urls import url_encode
from authlib.oauth1 import AuthorizationServer as _AuthorizationServer
from authlib.oauth1 import OAuth1Request
from authlib.oauth1 import TemporaryCredential
from .nonce import exists_nonce_in_cache
log = logging.getLogger(__name__)
class BaseServer(_AuthorizationServer):
def __init__(self, client_model, token_model, token_generator=None):
self.client_model = client_model
self.token_model = token_model
if token_generator is None:
def token_generator():
return {
"oauth_token": generate_token(42),
"oauth_token_secret": generate_token(48),
}
self.token_generator = token_generator
self._config = getattr(settings, "AUTHLIB_OAUTH1_PROVIDER", {})
self._nonce_expires_in = self._config.get("nonce_expires_in", 86400)
methods = self._config.get("signature_methods")
if methods:
self.SUPPORTED_SIGNATURE_METHODS = methods
def get_client_by_id(self, client_id):
try:
return self.client_model.objects.get(client_id=client_id)
except self.client_model.DoesNotExist:
return None
def exists_nonce(self, nonce, request):
return exists_nonce_in_cache(nonce, request, self._nonce_expires_in)
def create_token_credential(self, request):
temporary_credential = request.credential
token = self.token_generator()
item = self.token_model(
oauth_token=token["oauth_token"],
oauth_token_secret=token["oauth_token_secret"],
user_id=temporary_credential.get_user_id(),
client_id=temporary_credential.get_client_id(),
)
item.save()
return item
def check_authorization_request(self, request):
req = self.create_oauth1_request(request)
self.validate_authorization_request(req)
return req
def create_oauth1_request(self, request):
if request.method == "POST":
body = request.POST.dict()
else:
body = None
url = request.build_absolute_uri()
return OAuth1Request(request.method, url, body, request.headers)
def handle_response(self, status_code, payload, headers):
resp = HttpResponse(url_encode(payload), status=status_code)
for k, v in headers:
resp[k] = v
return resp
class CacheAuthorizationServer(BaseServer):
def __init__(self, client_model, token_model, token_generator=None):
super().__init__(client_model, token_model, token_generator)
self._temporary_expires_in = self._config.get(
"temporary_credential_expires_in", 86400
)
self._temporary_credential_key_prefix = self._config.get(
"temporary_credential_key_prefix", "temporary_credential:"
)
def create_temporary_credential(self, request):
key_prefix = self._temporary_credential_key_prefix
token = self.token_generator()
client_id = request.client_id
redirect_uri = request.redirect_uri
key = key_prefix + token["oauth_token"]
token["client_id"] = client_id
if redirect_uri:
token["oauth_callback"] = redirect_uri
cache.set(key, token, timeout=self._temporary_expires_in)
return TemporaryCredential(token)
def get_temporary_credential(self, request):
if not request.token:
return None
key_prefix = self._temporary_credential_key_prefix
key = key_prefix + request.token
value = cache.get(key)
if value:
return TemporaryCredential(value)
def delete_temporary_credential(self, request):
if request.token:
key_prefix = self._temporary_credential_key_prefix
key = key_prefix + request.token
cache.delete(key)
def create_authorization_verifier(self, request):
key_prefix = self._temporary_credential_key_prefix
verifier = generate_token(36)
credential = request.credential
user = request.user
key = key_prefix + credential.get_oauth_token()
credential["oauth_verifier"] = verifier
credential["user_id"] = user.pk
cache.set(key, credential, timeout=self._temporary_expires_in)
return verifier
|