1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
|
import base64
from http import HTTPStatus
from typing import Any, Awaitable, Callable
from aiohttp import web
from cryptography import fernet
from aiohttp_session import get_session, new_session, setup
from aiohttp_session.cookie_storage import EncryptedCookieStorage
DATABASE = [
("admin", "admin"),
]
_Handler = Callable[[web.Request], Awaitable[web.StreamResponse]]
user_key = web.AppKey("user", str)
def login_required(fn: _Handler) -> _Handler:
async def wrapped(
request: web.Request, *args: Any, **kwargs: Any
) -> web.StreamResponse:
app = request.app
router = app.router
session = await get_session(request)
if "user_id" not in session:
raise web.HTTPFound(router["login"].url_for())
user_id = session["user_id"]
# actually load user from your database (e.g. with aiopg)
user = DATABASE[user_id]
app[user_key] = user
return await fn(request, *args, **kwargs)
return wrapped
@login_required
async def handler(request: web.Request) -> web.Response:
user = request.app[user_key]
return web.Response(text=f"User {user} authorized")
tmpl = """\
<html>
<body>
<form method="post" action="login">
<label>Name:</label><input type="text" name="name"/>
<label>Password:</label><input type="password" name="password"/>
<input type="submit" value="Login"/>
</form>
</body>
</html>"""
async def login_page(request: web.Request) -> web.Response:
return web.Response(content_type="text/html", text=tmpl)
async def login(request: web.Request) -> web.Response:
router = request.app.router
form = await request.post()
user_signature = (form["name"], form["password"])
# actually implement business logic to check user credentials
try:
user_id = DATABASE.index(user_signature) # type: ignore[arg-type]
# Always use `new_session` during login to guard against
# Session Fixation. See aiohttp-session#281
session = await new_session(request)
session["user_id"] = user_id
raise web.HTTPFound(router["restricted"].url_for())
except ValueError:
return web.Response(text="No such user", status=HTTPStatus.FORBIDDEN)
def make_app() -> web.Application:
app = web.Application()
# secret_key must be 32 url-safe base64-encoded bytes
fernet_key = fernet.Fernet.generate_key()
secret_key = base64.urlsafe_b64decode(fernet_key)
setup(app, EncryptedCookieStorage(secret_key))
app.router.add_get("/", handler, name="restricted")
app.router.add_get("/login", login_page, name="login")
app.router.add_post("/login", login)
return app
web.run_app(make_app())
|