1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
import functools
from contextlib import contextmanager
from flask import g
from flask import json
from flask import request as _req
from werkzeug.local import LocalProxy
from authlib.oauth2 import OAuth2Error
from authlib.oauth2 import ResourceProtector as _ResourceProtector
from authlib.oauth2.rfc6749 import MissingAuthorizationError
from .errors import raise_http_exception
from .requests import FlaskJsonRequest
from .signals import token_authenticated
class ResourceProtector(_ResourceProtector):
"""A protecting method for resource servers. Creating a ``require_oauth``
decorator easily with ResourceProtector::
from authlib.integrations.flask_oauth2 import ResourceProtector
require_oauth = ResourceProtector()
# add bearer token validator
from authlib.oauth2.rfc6750 import BearerTokenValidator
from project.models import Token
class MyBearerTokenValidator(BearerTokenValidator):
def authenticate_token(self, token_string):
return Token.query.filter_by(access_token=token_string).first()
require_oauth.register_token_validator(MyBearerTokenValidator())
# protect resource with require_oauth
@app.route("/user")
@require_oauth(["profile"])
def user_profile():
user = User.get(current_token.user_id)
return jsonify(user.to_dict())
"""
def raise_error_response(self, error):
"""Raise HTTPException for OAuth2Error. Developers can re-implement
this method to customize the error response.
:param error: OAuth2Error
:raise: HTTPException
"""
status = error.status_code
body = json.dumps(dict(error.get_body()))
headers = error.get_headers()
raise_http_exception(status, body, headers)
def acquire_token(self, scopes=None, **kwargs):
"""A method to acquire current valid token with the given scope.
:param scopes: a list of scope values
:return: token object
"""
request = FlaskJsonRequest(_req)
# backward compatibility
kwargs["scopes"] = scopes
for claim in kwargs:
if isinstance(kwargs[claim], str):
kwargs[claim] = [kwargs[claim]]
token = self.validate_request(request=request, **kwargs)
token_authenticated.send(self, token=token)
g.authlib_server_oauth2_token = token
return token
@contextmanager
def acquire(self, scopes=None):
"""The with statement of ``require_oauth``. Instead of using a
decorator, you can use a with statement instead::
@app.route("/api/user")
def user_api():
with require_oauth.acquire("profile") as token:
user = User.get(token.user_id)
return jsonify(user.to_dict())
"""
try:
yield self.acquire_token(scopes)
except OAuth2Error as error:
self.raise_error_response(error)
def __call__(self, scopes=None, optional=False, **kwargs):
claims = kwargs
# backward compatibility
claims["scopes"] = scopes
def wrapper(f):
@functools.wraps(f)
def decorated(*args, **kwargs):
try:
self.acquire_token(**claims)
except MissingAuthorizationError as error:
if optional:
return f(*args, **kwargs)
self.raise_error_response(error)
except OAuth2Error as error:
self.raise_error_response(error)
return f(*args, **kwargs)
return decorated
return wrapper
def _get_current_token():
return g.get("authlib_server_oauth2_token")
current_token = LocalProxy(_get_current_token)
|