File: change_email.py

package info (click to toggle)
flask-security 5.6.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,448 kB
  • sloc: python: 23,247; javascript: 204; makefile: 138
file content (232 lines) | stat: -rw-r--r-- 7,318 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
"""
flask_security.change_email
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Flask-Security Change Email module

:copyright: (c) 2024-2024 by J. Christopher Wagner (jwag).
:license: MIT, see LICENSE for more details.

Allow user to change their email address.
If CHANGE_EMAIL_CONFIRM is set then the user will receive an email
at the new email address with a token that can be used to verify and change
emails. Upon success - if CHANGE_EMAIL_NOTIFY_OLD is set, an email will be sent
to the old email address.
"""

from __future__ import annotations

import typing as t

from flask import after_this_request, request
from flask import current_app
from flask_login import current_user
from wtforms import SubmitField

from .decorators import auth_required
from .forms import (
    Form,
    UniqueEmailFormMixin,
    build_form_from_request,
    form_errors_munge,
    get_form_field_label,
)
from .proxies import _security, _datastore
from .quart_compat import get_quart_status
from .signals import change_email_instructions_sent, change_email_confirmed
from .utils import (
    base_render_json,
    check_and_get_token_status,
    config_value as cv,
    do_flash,
    get_message,
    get_url,
    get_within_delta,
    hash_data,
    send_mail,
    url_for_security,
    verify_hash,
    view_commit,
)

if t.TYPE_CHECKING:  # pragma: no cover
    from flask.typing import ResponseValue

if get_quart_status():  # pragma: no cover
    from quart import redirect
else:
    from flask import redirect


class ChangeEmailForm(Form, UniqueEmailFormMixin):
    submit = SubmitField(label=get_form_field_label("submit"))

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.existing_email_user = None


@auth_required(
    lambda: cv("API_ENABLED_METHODS"),
    within=lambda: cv("FRESHNESS"),
    grace=lambda: cv("FRESHNESS_GRACE_PERIOD"),
)
def change_email() -> ResponseValue:
    """Start Change Email for an existing authenticated user"""
    payload: dict[str, t.Any]

    form: ChangeEmailForm = t.cast(
        ChangeEmailForm, build_form_from_request("change_email_form")
    )

    if form.validate_on_submit():
        _send_instructions(current_user, form.email.data)
        if not _security._want_json(request):
            do_flash(*get_message("CHANGE_EMAIL_SENT", email=form.email.data))
        # Drop through..

    # All paths get here
    if (
        request.method == "POST"
        and cv("RETURN_GENERIC_RESPONSES")
        and form.existing_email_user
    ):
        # Don't let an existing user enumerate registered emails
        fields_to_squash: dict[str, dict[str, str]] = dict(email=dict())
        form_errors_munge(form, fields_to_squash)
        if not form.errors:
            # only error is existing email - make it appear the same as if it worked
            # except that we don't send anything - while we could inform the email
            # that someone is trying to take it over - that could allow a user to
            # annoy another user.
            if not _security._want_json(request):
                do_flash(*get_message("CHANGE_EMAIL_SENT", email=form.email.data))
            # TODO - should we have a signal so we can tell application what is
            # is going on (otherwise it is pretty much a black-hole).
            # drop through - will return a successful response.

    if _security._want_json(request):
        form.user = current_user
        payload = dict(current_email=current_user.email)
        return base_render_json(form, additional=payload)

    return _security.render_template(
        cv("CHANGE_EMAIL_TEMPLATE"),
        change_email_form=form,
        current_email=current_user.email,
        **_security._run_ctx_processor("change_email"),
    )


def change_email_confirm(token):
    """
    View function which handles a change email confirmation request.
    This is always a GET from an email - so for 'spa' must always redirect.
    """
    expired, invalid, user, new_email = _verify_token_status(token)

    if invalid or expired:
        if expired:
            m, c = get_message(
                "CHANGE_EMAIL_EXPIRED",
                within=cv("CHANGE_EMAIL_WITHIN"),
            )
        else:
            m, c = get_message("API_ERROR")
        if cv("REDIRECT_BEHAVIOR") == "spa":
            return redirect(get_url(cv("CHANGE_EMAIL_ERROR_VIEW"), qparams={c: m}))
        do_flash(m, c)
        return redirect(
            get_url(cv("CHANGE_EMAIL_ERROR_VIEW")) or url_for_security("change_email")
        )

    _update_user_email(user, new_email)
    after_this_request(view_commit)
    m, c = get_message("CHANGE_EMAIL_CONFIRMED")
    if cv("REDIRECT_BEHAVIOR") == "spa":
        return redirect(
            get_url(
                cv("POST_CHANGE_EMAIL_VIEW"),
                qparams=user.get_redirect_qparams({c: m}),
            )
        )
    do_flash(m, c)
    return redirect(
        get_url(cv("POST_CHANGE_EMAIL_VIEW")) or get_url(cv("POST_LOGIN_VIEW"))
    )


def _generate_token(user, new_email):
    """Generates a unique confirmation token for the specified user.

    :param user: The user to work with
    :param new_email: The requested email
    """
    data = [str(user.fs_uniquifier), hash_data(user.email), new_email]
    return _security.change_email_serializer.dumps(data)


def _generate_link(user, new_email):
    token = _generate_token(user, new_email)
    return url_for_security("change_email_confirm", token=token, _external=True), token


def _send_instructions(user, new_email):
    """Sends the change email instructions email for the specified user.

    :param user: The user to send the instructions to
    :param new_email: The requested new email
    """

    link, token = _generate_link(user, new_email)

    send_mail(
        cv("CHANGE_EMAIL_SUBJECT"),
        new_email,
        "change_email_instructions",
        user=user,
        link=link,
        token=token,
    )

    change_email_instructions_sent.send(
        current_app._get_current_object(),
        _async_wrapper=current_app.ensure_sync,
        user=user,
        new_email=new_email,
        token=token,
    )


def _verify_token_status(token):
    """Verify token and contents.
    In general, we return pretty generic results - including checking the requested
    new_email is still available (and if not return 'invalid').
    """
    expired, invalid, state = check_and_get_token_status(
        token, "change_email", get_within_delta("CHANGE_EMAIL_WITHIN")
    )
    if invalid or expired:
        return expired, invalid, None, None
    fid, hashed_email, new_email = state
    user = _datastore.find_user(fs_uniquifier=fid)
    if not user or not verify_hash(hashed_email, user.email):
        return False, True, None, None

    # verify that new_email is still available
    if _datastore.find_user(email=new_email):
        return False, True, user, None
    return expired, invalid, user, new_email


def _update_user_email(user, new_email):
    old_email = user.email
    user.email = new_email
    user.confirmed_at = _security.datetime_factory()
    _datastore.put(user)
    change_email_confirmed.send(
        current_app._get_current_object(),
        _async_wrapper=current_app.ensure_sync,
        user=user,
        old_email=old_email,
    )