1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
|
"""
flask_security.change_email
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Flask-Security Change Email module
:copyright: (c) 2024-2024 by J. Christopher Wagner (jwag).
:license: MIT, see LICENSE for more details.
Allow user to change their email address.
If CHANGE_EMAIL_CONFIRM is set then the user will receive an email
at the new email address with a token that can be used to verify and change
emails. Upon success - if CHANGE_EMAIL_NOTIFY_OLD is set, an email will be sent
to the old email address.
"""
from __future__ import annotations
import typing as t
from flask import after_this_request, request
from flask import current_app
from flask_login import current_user
from wtforms import SubmitField
from .decorators import auth_required
from .forms import (
Form,
UniqueEmailFormMixin,
build_form_from_request,
form_errors_munge,
get_form_field_label,
)
from .proxies import _security, _datastore
from .quart_compat import get_quart_status
from .signals import change_email_instructions_sent, change_email_confirmed
from .utils import (
base_render_json,
check_and_get_token_status,
config_value as cv,
do_flash,
get_message,
get_url,
get_within_delta,
hash_data,
send_mail,
url_for_security,
verify_hash,
view_commit,
)
if t.TYPE_CHECKING: # pragma: no cover
from flask.typing import ResponseValue
if get_quart_status(): # pragma: no cover
from quart import redirect
else:
from flask import redirect
class ChangeEmailForm(Form, UniqueEmailFormMixin):
submit = SubmitField(label=get_form_field_label("submit"))
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.existing_email_user = None
@auth_required(
lambda: cv("API_ENABLED_METHODS"),
within=lambda: cv("FRESHNESS"),
grace=lambda: cv("FRESHNESS_GRACE_PERIOD"),
)
def change_email() -> ResponseValue:
"""Start Change Email for an existing authenticated user"""
payload: dict[str, t.Any]
form: ChangeEmailForm = t.cast(
ChangeEmailForm, build_form_from_request("change_email_form")
)
if form.validate_on_submit():
_send_instructions(current_user, form.email.data)
if not _security._want_json(request):
do_flash(*get_message("CHANGE_EMAIL_SENT", email=form.email.data))
# Drop through..
# All paths get here
if (
request.method == "POST"
and cv("RETURN_GENERIC_RESPONSES")
and form.existing_email_user
):
# Don't let an existing user enumerate registered emails
fields_to_squash: dict[str, dict[str, str]] = dict(email=dict())
form_errors_munge(form, fields_to_squash)
if not form.errors:
# only error is existing email - make it appear the same as if it worked
# except that we don't send anything - while we could inform the email
# that someone is trying to take it over - that could allow a user to
# annoy another user.
if not _security._want_json(request):
do_flash(*get_message("CHANGE_EMAIL_SENT", email=form.email.data))
# TODO - should we have a signal so we can tell application what is
# is going on (otherwise it is pretty much a black-hole).
# drop through - will return a successful response.
if _security._want_json(request):
form.user = current_user
payload = dict(current_email=current_user.email)
return base_render_json(form, additional=payload)
return _security.render_template(
cv("CHANGE_EMAIL_TEMPLATE"),
change_email_form=form,
current_email=current_user.email,
**_security._run_ctx_processor("change_email"),
)
def change_email_confirm(token):
"""
View function which handles a change email confirmation request.
This is always a GET from an email - so for 'spa' must always redirect.
"""
expired, invalid, user, new_email = _verify_token_status(token)
if invalid or expired:
if expired:
m, c = get_message(
"CHANGE_EMAIL_EXPIRED",
within=cv("CHANGE_EMAIL_WITHIN"),
)
else:
m, c = get_message("API_ERROR")
if cv("REDIRECT_BEHAVIOR") == "spa":
return redirect(get_url(cv("CHANGE_EMAIL_ERROR_VIEW"), qparams={c: m}))
do_flash(m, c)
return redirect(
get_url(cv("CHANGE_EMAIL_ERROR_VIEW")) or url_for_security("change_email")
)
_update_user_email(user, new_email)
after_this_request(view_commit)
m, c = get_message("CHANGE_EMAIL_CONFIRMED")
if cv("REDIRECT_BEHAVIOR") == "spa":
return redirect(
get_url(
cv("POST_CHANGE_EMAIL_VIEW"),
qparams=user.get_redirect_qparams({c: m}),
)
)
do_flash(m, c)
return redirect(
get_url(cv("POST_CHANGE_EMAIL_VIEW")) or get_url(cv("POST_LOGIN_VIEW"))
)
def _generate_token(user, new_email):
"""Generates a unique confirmation token for the specified user.
:param user: The user to work with
:param new_email: The requested email
"""
data = [str(user.fs_uniquifier), hash_data(user.email), new_email]
return _security.change_email_serializer.dumps(data)
def _generate_link(user, new_email):
token = _generate_token(user, new_email)
return url_for_security("change_email_confirm", token=token, _external=True), token
def _send_instructions(user, new_email):
"""Sends the change email instructions email for the specified user.
:param user: The user to send the instructions to
:param new_email: The requested new email
"""
link, token = _generate_link(user, new_email)
send_mail(
cv("CHANGE_EMAIL_SUBJECT"),
new_email,
"change_email_instructions",
user=user,
link=link,
token=token,
)
change_email_instructions_sent.send(
current_app._get_current_object(),
_async_wrapper=current_app.ensure_sync,
user=user,
new_email=new_email,
token=token,
)
def _verify_token_status(token):
"""Verify token and contents.
In general, we return pretty generic results - including checking the requested
new_email is still available (and if not return 'invalid').
"""
expired, invalid, state = check_and_get_token_status(
token, "change_email", get_within_delta("CHANGE_EMAIL_WITHIN")
)
if invalid or expired:
return expired, invalid, None, None
fid, hashed_email, new_email = state
user = _datastore.find_user(fs_uniquifier=fid)
if not user or not verify_hash(hashed_email, user.email):
return False, True, None, None
# verify that new_email is still available
if _datastore.find_user(email=new_email):
return False, True, user, None
return expired, invalid, user, new_email
def _update_user_email(user, new_email):
old_email = user.email
user.email = new_email
user.confirmed_at = _security.datetime_factory()
_datastore.put(user)
change_email_confirmed.send(
current_app._get_current_object(),
_async_wrapper=current_app.ensure_sync,
user=user,
old_email=old_email,
)
|