1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
import functools
from flask import Response
from flask import g
from flask import json
from flask import request as _req
from werkzeug.local import LocalProxy
from authlib.consts import default_json_headers
from authlib.oauth1 import ResourceProtector as _ResourceProtector
from authlib.oauth1.errors import OAuth1Error
class ResourceProtector(_ResourceProtector):
"""A protecting method for resource servers. Initialize a resource
protector with the these method:
1. query_client
2. query_token,
3. exists_nonce
Usually, a ``query_client`` method would look like (if using SQLAlchemy)::
def query_client(client_id):
return Client.query.filter_by(client_id=client_id).first()
A ``query_token`` method accept two parameters, ``client_id`` and ``oauth_token``::
def query_token(client_id, oauth_token):
return Token.query.filter_by(
client_id=client_id, oauth_token=oauth_token
).first()
And for ``exists_nonce``, if using cache, we have a built-in hook to create this method::
from authlib.integrations.flask_oauth1 import create_exists_nonce_func
exists_nonce = create_exists_nonce_func(cache)
Then initialize the resource protector with those methods::
require_oauth = ResourceProtector(
app,
query_client=query_client,
query_token=query_token,
exists_nonce=exists_nonce,
)
"""
def __init__(
self, app=None, query_client=None, query_token=None, exists_nonce=None
):
self.query_client = query_client
self.query_token = query_token
self._exists_nonce = exists_nonce
self.app = app
if app:
self.init_app(app)
def init_app(self, app, query_client=None, query_token=None, exists_nonce=None):
if query_client is not None:
self.query_client = query_client
if query_token is not None:
self.query_token = query_token
if exists_nonce is not None:
self._exists_nonce = exists_nonce
methods = app.config.get("OAUTH1_SUPPORTED_SIGNATURE_METHODS")
if methods and isinstance(methods, (list, tuple)):
self.SUPPORTED_SIGNATURE_METHODS = methods
self.app = app
def get_client_by_id(self, client_id):
return self.query_client(client_id)
def get_token_credential(self, request):
return self.query_token(request.client_id, request.token)
def exists_nonce(self, nonce, request):
if not self._exists_nonce:
raise RuntimeError('"exists_nonce" function is required.')
timestamp = request.timestamp
client_id = request.client_id
token = request.token
return self._exists_nonce(nonce, timestamp, client_id, token)
def acquire_credential(self):
req = self.validate_request(
_req.method, _req.url, _req.form.to_dict(flat=True), _req.headers
)
g.authlib_server_oauth1_credential = req.credential
return req.credential
def __call__(self, scope=None):
def wrapper(f):
@functools.wraps(f)
def decorated(*args, **kwargs):
try:
self.acquire_credential()
except OAuth1Error as error:
body = dict(error.get_body())
return Response(
json.dumps(body),
status=error.status_code,
headers=default_json_headers,
)
return f(*args, **kwargs)
return decorated
return wrapper
def _get_current_credential():
return g.get("authlib_server_oauth1_credential")
current_credential = LocalProxy(_get_current_credential)
|