1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
"""
flask_security.recoverable
~~~~~~~~~~~~~~~~~~~~~~~~~~
Flask-Security recoverable module
:copyright: (c) 2012 by Matt Wright.
:copyright: (c) 2019-2025 by J. Christopher Wagner (jwag).
:license: MIT, see LICENSE for more details.
"""
from flask import current_app
from .proxies import _security, _datastore
from .signals import (
password_reset,
reset_password_instructions_sent,
username_recovery_email_sent,
)
from .utils import (
config_value,
get_token_status,
hash_data,
hash_password,
send_mail,
url_for_security,
verify_hash,
)
def generate_reset_link(user):
token = generate_reset_password_token(user)
return url_for_security("reset_password", token=token, _external=True), token
def send_reset_password_instructions(user):
"""Sends the reset password instructions email for the specified user.
:param user: The user to send the instructions to
"""
reset_link, token = generate_reset_link(user)
if config_value("SEND_PASSWORD_RESET_EMAIL"):
send_mail(
config_value("EMAIL_SUBJECT_PASSWORD_RESET"),
user.email,
"reset_instructions",
user=user,
reset_link=reset_link,
reset_token=token,
)
reset_password_instructions_sent.send(
current_app._get_current_object(),
_async_wrapper=current_app.ensure_sync,
user=user,
token=token,
reset_token=token,
)
def send_password_reset_notice(user):
"""Sends the password reset notice email for the specified user.
:param user: The user to send the notice to
"""
if config_value("SEND_PASSWORD_RESET_NOTICE_EMAIL"):
send_mail(
config_value("EMAIL_SUBJECT_PASSWORD_NOTICE"),
user.email,
"reset_notice",
user=user,
)
def generate_reset_password_token(user):
"""Generates a unique reset password token for the specified user.
:param user: The user to work with
"""
password_hash = hash_data(user.password) if user.password else None
data = [str(user.fs_uniquifier), password_hash]
return _security.reset_serializer.dumps(data)
def reset_password_token_status(token):
"""Returns the expired status, invalid status, and user of a password reset
token. For example::
expired, invalid, user, data = reset_password_token_status('...')
:param token: The password reset token
"""
expired, invalid, user, data = get_token_status(
token, "reset", "RESET_PASSWORD", return_data=True
)
# This check looks to see if the password has been changed since the reset token
# was created. As of #338 - we reset the fs_uniquifier on each password change
# so the token would have been marked invalid above.
# This made sure that the token couldn't be used twice.
# TODO - look at removing this entire check.
if not invalid and user:
if user.password:
if not verify_hash(data[1], user.password):
invalid = True
return expired, invalid, user
def update_password(user, password):
"""Update the specified user's password
:param user: The user to update_password
:param password: The unhashed new password
"""
user.password = hash_password(password)
# Change uniquifier - this will cause ALL sessions to be invalidated.
_datastore.set_uniquifier(user)
_datastore.put(user)
send_password_reset_notice(user)
password_reset.send(
current_app._get_current_object(),
_async_wrapper=current_app.ensure_sync,
user=user,
)
def send_username_recovery_email(user):
"""Sends the username recovery email for the specified user.
:param user: The user requesting username recovery
"""
if config_value("USERNAME_RECOVERY"):
send_mail(
config_value("EMAIL_SUBJECT_USERNAME_RECOVERY"),
user.email,
"username_recovery",
user=user,
username=user.username,
)
username_recovery_email_sent.send(
current_app._get_current_object(),
_async_wrapper=current_app.ensure_sync,
user=user,
)
|