File: jwt_identity.py

package info (click to toggle)
python-aiohttp-security 0.5.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 416 kB
  • sloc: python: 1,133; makefile: 193
file content (52 lines) | stat: -rw-r--r-- 1,531 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""Identity policy for storing info in the jwt token.

"""

from typing import Optional

from aiohttp import web

from .abc import AbstractIdentityPolicy

try:
    import jwt
    HAS_JWT = True
except ImportError:  # pragma: no cover
    HAS_JWT = False


AUTH_HEADER_NAME = 'Authorization'
AUTH_SCHEME = 'Bearer '


class JWTIdentityPolicy(AbstractIdentityPolicy):
    def __init__(self, secret: str, algorithm: str = "HS256", key: str = "login"):
        if not HAS_JWT:
            raise RuntimeError('Please install `PyJWT`')
        self.secret = secret
        self.algorithm = algorithm
        self.key = key

    async def identify(self, request: web.Request) -> Optional[str]:
        header_identity = request.headers.get(AUTH_HEADER_NAME)

        if header_identity is None:
            return None

        if not header_identity.startswith(AUTH_SCHEME):
            raise ValueError("Invalid authorization scheme. "
                             + "Should be `{}<token>`".format(AUTH_SCHEME))

        token = header_identity.split(' ')[1].strip()

        identity = jwt.decode(token,
                              self.secret,
                              algorithms=[self.algorithm])
        return identity.get(self.key)  # type: ignore[no-any-return]

    async def remember(self, request: web.Request, response: web.StreamResponse,
                       identity: str, **kwargs: None) -> None:
        pass

    async def forget(self, request: web.Request, response: web.StreamResponse) -> None:
        pass