1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
import binascii
import json
from typing import Any, Callable, Optional
import nacl.exceptions
import nacl.secret
import nacl.utils
from aiohttp import web
from nacl.encoding import Base64Encoder
from . import AbstractStorage, Session
from .log import log
class NaClCookieStorage(AbstractStorage):
"""NaCl Encrypted JSON storage."""
def __init__(
self,
secret_key: bytes,
*,
cookie_name: str = "AIOHTTP_SESSION",
domain: Optional[str] = None,
max_age: Optional[int] = None,
path: str = "/",
secure: Optional[bool] = None,
httponly: bool = True,
samesite: Optional[str] = None,
encoder: Callable[[object], str] = json.dumps,
decoder: Callable[[str], Any] = json.loads
) -> None:
super().__init__(
cookie_name=cookie_name,
domain=domain,
max_age=max_age,
path=path,
secure=secure,
httponly=httponly,
samesite=samesite,
encoder=encoder,
decoder=decoder,
)
self._secretbox = nacl.secret.SecretBox(secret_key)
def empty_session(self) -> Session:
return Session(None, data=None, new=True, max_age=self.max_age)
async def load_session(self, request: web.Request) -> Session:
cookie = self.load_cookie(request)
if cookie is None:
return self.empty_session()
else:
try:
data = self._decoder(
self._secretbox.decrypt(
cookie.encode("utf-8"), encoder=Base64Encoder
).decode("utf-8")
)
return Session(None, data=data, new=False, max_age=self.max_age)
except (binascii.Error, nacl.exceptions.CryptoError):
log.warning(
"Cannot decrypt cookie value, " "create a new fresh session"
)
return self.empty_session()
async def save_session(
self, request: web.Request, response: web.StreamResponse, session: Session
) -> None:
if session.empty:
return self.save_cookie(response, "", max_age=session.max_age)
cookie_data = self._encoder(self._get_session_data(session)).encode("utf-8")
nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE)
self.save_cookie(
response,
self._secretbox.encrypt(cookie_data, nonce, encoder=Base64Encoder).decode(
"utf-8"
),
max_age=session.max_age,
)
|