1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
from textwrap import dedent
from typing import Dict, NoReturn, Optional
from aiohttp import web
from aiohttp_security import (authorized_userid, check_authorized, check_permission, forget,
remember)
from .authz import check_credentials
from .users import User
index_template = dedent("""
<!doctype html>
<head></head>
<body>
<p>{message}</p>
<form action="/login" method="post">
Login:
<input type="text" name="username">
Password:
<input type="password" name="password">
<input type="submit" value="Login">
</form>
<a href="/logout">Logout</a>
</body>
""")
async def index(request: web.Request) -> web.Response:
username = await authorized_userid(request)
if username:
template = index_template.format(
message='Hello, {username}!'.format(username=username))
else:
template = index_template.format(message='You need to login')
return web.Response(
text=template,
content_type='text/html',
)
async def login(request: web.Request) -> NoReturn:
user_map: Dict[Optional[str], User] = request.app["user_map"]
invalid_response = web.HTTPUnauthorized(body="Invalid username / password combination")
form = await request.post()
username = form.get('username')
password = form.get('password')
if not (isinstance(username, str) and isinstance(password, str)):
raise invalid_response
verified = await check_credentials(user_map, username, password)
if verified:
response = web.HTTPFound("/")
await remember(request, response, username)
raise response
raise invalid_response
async def logout(request: web.Request) -> web.Response:
await check_authorized(request)
response = web.Response(
text='You have been logged out',
content_type='text/html',
)
await forget(request, response)
return response
async def internal_page(request: web.Request) -> web.Response:
await check_permission(request, 'public')
response = web.Response(
text='This page is visible for all registered users',
content_type='text/html',
)
return response
async def protected_page(request: web.Request) -> web.Response:
await check_permission(request, 'protected')
response = web.Response(
text='You are on protected page',
content_type='text/html',
)
return response
def configure_handlers(app: web.Application) -> None:
router = app.router
router.add_get('/', index, name='index')
router.add_post('/login', login, name='login')
router.add_get('/logout', logout, name='logout')
router.add_get('/public', internal_page, name='public')
router.add_get('/protected', protected_page, name='protected')
|