1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
import time
def create_query_client_func(session, client_model):
"""Create an ``query_client`` function that can be used in authorization
server.
:param session: SQLAlchemy session
:param client_model: Client model class
"""
def query_client(client_id):
q = session.query(client_model)
return q.filter_by(client_id=client_id).first()
return query_client
def create_save_token_func(session, token_model):
"""Create an ``save_token`` function that can be used in authorization
server.
:param session: SQLAlchemy session
:param token_model: Token model class
"""
def save_token(token, request):
if request.user:
user_id = request.user.get_user_id()
else:
user_id = None
client = request.client
item = token_model(client_id=client.client_id, user_id=user_id, **token)
session.add(item)
session.commit()
return save_token
def create_query_token_func(session, token_model):
"""Create an ``query_token`` function for revocation, introspection
token endpoints.
:param session: SQLAlchemy session
:param token_model: Token model class
"""
def query_token(token, token_type_hint):
q = session.query(token_model)
if token_type_hint == "access_token":
return q.filter_by(access_token=token).first()
elif token_type_hint == "refresh_token":
return q.filter_by(refresh_token=token).first()
# without token_type_hint
item = q.filter_by(access_token=token).first()
if item:
return item
return q.filter_by(refresh_token=token).first()
return query_token
def create_revocation_endpoint(session, token_model):
"""Create a revocation endpoint class with SQLAlchemy session
and token model.
:param session: SQLAlchemy session
:param token_model: Token model class
"""
from authlib.oauth2.rfc7009 import RevocationEndpoint
query_token = create_query_token_func(session, token_model)
class _RevocationEndpoint(RevocationEndpoint):
def query_token(self, token, token_type_hint):
return query_token(token, token_type_hint)
def revoke_token(self, token, request):
now = int(time.time())
hint = request.form.get("token_type_hint")
token.access_token_revoked_at = now
if hint != "access_token":
token.refresh_token_revoked_at = now
session.add(token)
session.commit()
return _RevocationEndpoint
def create_bearer_token_validator(session, token_model):
"""Create an bearer token validator class with SQLAlchemy session
and token model.
:param session: SQLAlchemy session
:param token_model: Token model class
"""
from authlib.oauth2.rfc6750 import BearerTokenValidator
class _BearerTokenValidator(BearerTokenValidator):
def authenticate_token(self, token_string):
q = session.query(token_model)
return q.filter_by(access_token=token_string).first()
return _BearerTokenValidator
|