File: twofactor.py

package info (click to toggle)
flask-security 5.6.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,448 kB
  • sloc: python: 23,247; javascript: 204; makefile: 138
file content (240 lines) | stat: -rw-r--r-- 8,414 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
"""
flask_security.two_factor
~~~~~~~~~~~~~~~~~~~~~~~~~~~

Flask-Security two_factor module

:copyright: (c) 2016 by Gal Stainfeld, at Emedgene
:copyright: (c) 2019-2024 by J. Christopher Wagner (jwag).
"""

from __future__ import annotations

import typing as t

from flask import current_app, redirect, request, session

from .forms import (
    get_form_field_xlate,
    DummyForm,
    TwoFactorRescueForm,
)
from .proxies import _security, _datastore
from .tf_plugin import TfPluginBase, tf_clean_session
from .utils import (
    _,
    SmsSenderFactory,
    base_render_json,
    config_value as cv,
    do_flash,
    get_message,
    json_error_response,
    send_mail,
    url_for_security,
)
from .signals import (
    tf_code_confirmed,
    tf_disabled,
    tf_security_token_sent,
    tf_profile_changed,
)

if t.TYPE_CHECKING:  # pragma: no cover
    import flask
    from flask_security import Security, UserMixin
    from flask.typing import ResponseValue


def tf_send_security_token(user, method, totp_secret, phone_number):
    """Sends the security token via email/sms for the specified user.

    :param user: The user to send the code to
    :param method: The method in which the code will be sent
                   ('email' or 'sms', or 'authenticator') at the moment
    :param totp_secret: a unique shared secret of the user
    :param phone_number: If 'sms' phone number to send to

    There is no return value - it is assumed that exceptions are thrown by underlying
    methods that callers can catch.

    Flask-Security code should NOT call this directly -
    call :meth:`.UserMixin.tf_send_security_token`
    """
    token_to_be_sent = _security.totp_factory.generate_totp_password(totp_secret)
    if method == "email" or method == "mail":
        send_mail(
            cv("EMAIL_SUBJECT_TWO_FACTOR"),
            user.email,
            "two_factor_instructions",
            user=user,
            token=token_to_be_sent,
            username=user.calc_username(),
        )
    elif method == "sms":
        m, c = get_message("USE_CODE", code=token_to_be_sent)
        from_number = cv("SMS_SERVICE_CONFIG")["PHONE_NUMBER"]
        to_number = phone_number
        sms_sender = SmsSenderFactory.createSender(cv("SMS_SERVICE"))
        sms_sender.send_sms(from_number=from_number, to_number=to_number, msg=m)

    else:
        # password are generated automatically in the authenticator apps or not needed
        token_to_be_sent = None

    tf_security_token_sent.send(
        current_app._get_current_object(),
        _async_wrapper=current_app.ensure_sync,
        user=user,
        method=method,
        token=token_to_be_sent,
        login_token=token_to_be_sent,
        phone_number=phone_number,
    )


def complete_two_factor_process(user, primary_method, totp_secret, is_changing):
    """clean session according to process (login or changing two-factor method)
    and perform action accordingly
    """

    _datastore.tf_set(user, primary_method, totp_secret=totp_secret)

    # if we are changing two-factor method
    dologin = False
    if is_changing:
        # As of 5.5.0 this is the legacy path (using session data)
        completion_message = "TWO_FACTOR_CHANGE_METHOD_SUCCESSFUL"
        tf_profile_changed.send(
            current_app._get_current_object(),
            _async_wrapper=current_app.ensure_sync,
            user=user,
            method=primary_method,
        )
    # if we are logging in for the first time
    else:
        completion_message = "TWO_FACTOR_LOGIN_SUCCESSFUL"
        tf_code_confirmed.send(
            current_app._get_current_object(),
            _async_wrapper=current_app.ensure_sync,
            user=user,
            method=primary_method,
        )
        dologin = True
    token = _security.two_factor_plugins.tf_complete(user, dologin)
    return completion_message, token


def set_rescue_options(form: TwoFactorRescueForm, user: UserMixin) -> dict[str, str]:
    # Based on config - set up options for rescue.
    # Note that this modifies the passed in Form as well as returns
    # a dict that can be returned as part of a JSON response.
    recovery_options = dict(help=url_for_security("two_factor_rescue"))

    if cv("TWO_FACTOR_RESCUE_EMAIL"):
        recovery_options["email"] = url_for_security("two_factor_rescue")
        assert isinstance(form.help_setup.choices, list)
        form.help_setup.choices.append(
            ("email", get_form_field_xlate(_("Send code via email")))
        )

    if (
        _security.support_mfa
        and cv("MULTI_FACTOR_RECOVERY_CODES")
        and _datastore.mf_get_recovery_codes(user)
    ):
        recovery_options["recovery_code"] = url_for_security("mf_recovery")
        assert isinstance(form.help_setup.choices, list)
        form.help_setup.choices.append(
            (
                "recovery_code",
                get_form_field_xlate(_("Use previously downloaded recovery code")),
            )
        )
    return recovery_options


def tf_disable(user):
    """Disable two factor for user"""
    tf_clean_session()
    _datastore.tf_reset(user)
    tf_disabled.send(
        current_app._get_current_object(),
        _async_wrapper=current_app.ensure_sync,
        user=user,
    )


def is_tf_setup(user):
    """Return True is user account is setup for 2FA."""
    return user.tf_totp_secret and user.tf_primary_method


class CodeTfPlugin(TfPluginBase):
    def __init__(self, app: flask.Flask):
        super().__init__(app)

    def create_blueprint(
        self, app: flask.Flask, bp: flask.Blueprint, state: Security
    ) -> None:
        pass

    def get_setup_methods(self, user: UserMixin) -> list[str]:
        if is_tf_setup(user):
            assert user.tf_primary_method is not None
            return [user.tf_primary_method]
        return []

    def tf_login(
        self, user: UserMixin, json_payload: dict[str, t.Any], next_loc: str | None
    ) -> ResponseValue:
        """Helper for two-factor authentication login

        This is called only when login/password have already been validated.
        This can be from login, register, confirm, unified sign in, unified magic link.

        If two-factor is already setup then this sends a code if the method requires it.
        If not, then user is redirected to two-factor-setup.
        In either case we do NOT log in user, so we must store some info in session to
        track our state (including what user).
        """

        # if user's two-factor properties are not configured
        if not is_tf_setup(user):
            session["tf_state"] = "setup_from_login"
            json_payload["tf_state"] = "setup_from_login"
            if not _security._want_json(request):
                return redirect(url_for_security("two_factor_setup"))

        # if user's two-factor properties are configured
        else:
            session["tf_state"] = "ready"
            json_payload["tf_state"] = "ready"
            json_payload["tf_primary_method"] = user.tf_primary_method
            json_payload["tf_method"] = user.tf_primary_method

            if user.tf_primary_method in ["mail", "email", "sms"]:
                msg = user.tf_send_security_token(
                    method=user.tf_primary_method,
                    totp_secret=user.tf_totp_secret,
                    phone_number=getattr(user, "tf_phone_number", None),
                )
                if msg:
                    # send code didn't work
                    if not _security._want_json(request):
                        # This is a mess -
                        # we are deep down in the login/unified sign in flow.
                        do_flash(msg, "error")
                        return redirect(url_for_security("login"))
                    else:
                        payload = json_error_response(errors=msg)
                        return _security._render_json(payload, 500, None, None)

            if not _security._want_json(request):
                values = dict(next=next_loc) if next_loc else dict()
                return redirect(
                    url_for_security("two_factor_token_validation", **values)
                )

        # JSON response - Fake up a form - doesn't really matter which.
        form = DummyForm(formdata=None)
        return base_render_json(form, include_user=False, additional=json_payload)