1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
|
"""
flask_security.registerable
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Flask-Security registerable module
:copyright: (c) 2012 by Matt Wright.
:copyright: (c) 2019-2025 by J. Christopher Wagner (jwag).
:license: MIT, see LICENSE for more details.
"""
from __future__ import annotations
import typing as t
from flask import current_app
from .confirmable import generate_confirmation_link, requires_confirmation
from .forms import form_errors_munge
from .proxies import _security, _datastore
from .recoverable import generate_reset_link
from .signals import user_registered, user_not_registered
from .utils import (
config_value as cv,
do_flash,
get_message,
hash_password,
send_mail,
url_for_security,
)
if t.TYPE_CHECKING: # pragma: no cover
from .forms import ConfirmRegisterForm, RegisterForm, RegisterFormV2
def register_user(registration_form):
"""
Calls datastore to create user, triggers post-registration logic
(e.g. sending confirmation link, sending registration mail)
:param registration_form: form with user registration data
:return: user instance
"""
user_model_kwargs = registration_form.to_dict(only_user=True)
# passwords are not always required -
# with UNIFIED_SIGNIN and PASSWORD_REQUIRED=False
if user_model_kwargs["password"]:
user_model_kwargs["password"] = hash_password(user_model_kwargs["password"])
user = _datastore.create_user(**user_model_kwargs)
# if they didn't give a password - auto-setup email magic links (if UNIFIED SIGNIN)
if not user_model_kwargs["password"] and cv("UNIFIED_SIGNIN"):
_datastore.us_setup_email(user)
confirmation_link, token = None, None
if _security.confirmable:
confirmation_link, token = generate_confirmation_link(user)
do_flash(*get_message("CONFIRM_REGISTRATION", email=user.email))
user_registered.send(
current_app._get_current_object(),
_async_wrapper=current_app.ensure_sync,
user=user,
confirm_token=token,
confirmation_token=token,
form_data=registration_form.to_dict(only_user=False),
)
if cv("SEND_REGISTER_EMAIL"):
send_mail(
cv("EMAIL_SUBJECT_REGISTER"),
user.email,
"welcome",
user=user,
confirmation_link=confirmation_link,
confirmation_token=token,
)
return user
def register_existing(
form: ConfirmRegisterForm | RegisterForm | RegisterFormV2,
) -> bool:
"""
In the case of generic responses we want to mitigate any possible
email/username enumeration.
For an existing email we send an email to that address and tell them they
are already registered (and provide their username if any).
N.B. This (and forgot and confirm) could be used to DDOS an email by constantly
issuing requests. One way to mitigate that is to use signals and add specific
application code.
Returning False means to return normal error messages.
Returns True if the only 'error' is an existing email/user. In this case we
simulate a normal registration and email the existing account to inform.
"""
if not (
cv("RETURN_GENERIC_RESPONSES")
or form.existing_username_user
or form.existing_email_user
): # pragma: no cover
return False
# There are 2 classes of error - an existing email/username and non-compliant
# email/username/password. We want to give the user feedback on a non-compliant
# input - but not give away whether the email/username is already taken.
# Since in this case we have an 'existing' entry - we simply Null out those
# errors.
# This also means for JSON there is no way to tell if things worked or not.
fields_to_squash: dict[str, dict[str, str]] = dict()
if form.existing_email_user:
fields_to_squash["email"] = dict()
if hasattr(form, "username") and form.existing_username_user:
fields_to_squash["username"] = dict()
form_errors_munge(form, fields_to_squash)
if form.errors:
# some other illegal password/username - return an error
return False
# only errors were existing email/username
hash_password("not-a-password") # reduce timing between successful and not.
# Same as is done in register_user()
if _security.confirmable:
do_flash(*get_message("CONFIRM_REGISTRATION", email=form.email.data))
# 2 cases:
# 1) existing email (an already registered account) with an empty or same username
# 2) new email with existing username (which corresponds to some OTHER account)
if form.existing_email_user:
user_not_registered.send(
current_app._get_current_object(), # type: ignore
_async_wrapper=current_app.ensure_sync, # type: ignore[arg-type]
user=form.existing_email_user,
existing_email=True,
existing_username=form.existing_username_user is not None,
form_data=form.to_dict(only_user=False),
)
if cv("SEND_REGISTER_EMAIL"):
# Send a nice email saying they are already registered -
# - tell them their existing username if they have one
# - suggest how to reset password and send reset link/token
# - if they haven't confirmed - send them confirm link and token
recovery_link = ""
reset_link = ""
reset_token = ""
if _security.recoverable:
recovery_link = url_for_security("forgot_password", _external=True)
reset_link, reset_token = generate_reset_link(form.existing_email_user)
confirmation_link = ""
confirmation_token = ""
if requires_confirmation(form.existing_email_user):
confirmation_link, confirmation_token = generate_confirmation_link(
form.existing_email_user
)
send_mail(
cv("EMAIL_SUBJECT_REGISTER"),
form.existing_email_user.email,
"welcome_existing",
user=form.existing_email_user,
recovery_link=recovery_link,
reset_link=reset_link,
reset_token=reset_token,
confirmation_link=confirmation_link,
confirmation_token=confirmation_token,
)
elif form.existing_username_user:
# New email, already taken username.
# Note that we send email to NEW email - so it is possible for a bad-actor
# to enumerate usernames (slowly).
user_not_registered.send(
current_app._get_current_object(), # type: ignore[attr-defined]
_async_wrapper=current_app.ensure_sync, # type: ignore[arg-type]
user=None,
existing_email=False,
existing_username=True,
form_data=form.to_dict(only_user=False),
)
if cv("SEND_REGISTER_EMAIL"):
send_mail(
cv("EMAIL_SUBJECT_REGISTER"),
form.email.data,
"welcome_existing_username",
email=form.email.data,
username=form.username.data if hasattr(form, "username") else None,
)
return True
|