1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
"""Identity policy for storing info in the jwt token.
"""
from typing import Optional
from aiohttp import web
from .abc import AbstractIdentityPolicy
try:
import jwt
HAS_JWT = True
except ImportError: # pragma: no cover
HAS_JWT = False
AUTH_HEADER_NAME = 'Authorization'
AUTH_SCHEME = 'Bearer '
class JWTIdentityPolicy(AbstractIdentityPolicy):
def __init__(self, secret: str, algorithm: str = "HS256", key: str = "login"):
if not HAS_JWT:
raise RuntimeError('Please install `PyJWT`')
self.secret = secret
self.algorithm = algorithm
self.key = key
async def identify(self, request: web.Request) -> Optional[str]:
header_identity = request.headers.get(AUTH_HEADER_NAME)
if header_identity is None:
return None
if not header_identity.startswith(AUTH_SCHEME):
raise ValueError("Invalid authorization scheme. "
+ "Should be `{}<token>`".format(AUTH_SCHEME))
token = header_identity.split(' ')[1].strip()
identity = jwt.decode(token,
self.secret,
algorithms=[self.algorithm])
return identity.get(self.key) # type: ignore[no-any-return]
async def remember(self, request: web.Request, response: web.StreamResponse,
identity: str, **kwargs: None) -> None:
pass
async def forget(self, request: web.Request, response: web.StreamResponse) -> None:
pass
|