File: twofactor.py

package info (click to toggle)
flask-security 4.0.0-1%2Bdeb11u1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 2,340 kB
  • sloc: python: 12,730; makefile: 131
file content (254 lines) | stat: -rw-r--r-- 8,669 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
"""
    flask_security.two_factor
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~

    Flask-Security two_factor module

    :copyright: (c) 2016 by Gal Stainfeld, at Emedgene
    :copyright: (c) 2019-2020 by J. Christopher Wagner (jwag).
"""

from flask import current_app as app, redirect, request, session
from werkzeug.datastructures import MultiDict
from werkzeug.local import LocalProxy

from .utils import (
    SmsSenderFactory,
    base_render_json,
    check_and_get_token_status,
    config_value,
    do_flash,
    get_within_delta,
    login_user,
    json_error_response,
    send_mail,
    url_for_security,
)
from .signals import (
    tf_code_confirmed,
    tf_disabled,
    tf_security_token_sent,
    tf_profile_changed,
)

# Convenient references
_security = LocalProxy(lambda: app.extensions["security"])
_datastore = LocalProxy(lambda: _security.datastore)


def tf_clean_session():
    """
    Clean out ALL stuff stored in session (e.g. on logout)
    """
    if config_value("TWO_FACTOR"):
        for k in [
            "tf_state",
            "tf_user_id",
            "tf_primary_method",
            "tf_remember_login",
            "tf_totp_secret",
        ]:
            session.pop(k, None)


def tf_send_security_token(user, method, totp_secret, phone_number):
    """Sends the security token via email/sms for the specified user.

    :param user: The user to send the code to
    :param method: The method in which the code will be sent
                   ('email' or 'sms', or 'authenticator') at the moment
    :param totp_secret: a unique shared secret of the user
    :param phone_number: If 'sms' phone number to send to

    There is no return value - it is assumed that exceptions are thrown by underlying
    methods that callers can catch.

    Flask-Security code should NOT call this directly -
    call :meth:`.UserMixin.tf_send_security_token`
    """
    token_to_be_sent = _security._totp_factory.generate_totp_password(totp_secret)
    if method == "email" or method == "mail":
        send_mail(
            config_value("EMAIL_SUBJECT_TWO_FACTOR"),
            user.email,
            "two_factor_instructions",
            user=user,
            token=token_to_be_sent,
            username=user.calc_username(),
        )
    elif method == "sms":
        msg = "Use this code to log in: %s" % token_to_be_sent
        from_number = config_value("SMS_SERVICE_CONFIG")["PHONE_NUMBER"]
        to_number = phone_number
        sms_sender = SmsSenderFactory.createSender(config_value("SMS_SERVICE"))
        sms_sender.send_sms(from_number=from_number, to_number=to_number, msg=msg)

    elif method == "google_authenticator" or method == "authenticator":
        # password are generated automatically in the authenticator apps
        pass
    tf_security_token_sent.send(
        app._get_current_object(),
        user=user,
        method=method,
        token=token_to_be_sent,
        phone_number=phone_number,
    )


def complete_two_factor_process(
    user, primary_method, totp_secret, is_changing, remember_login=None
):
    """clean session according to process (login or changing two-factor method)
    and perform action accordingly
    """

    _datastore.tf_set(user, primary_method, totp_secret=totp_secret)

    # if we are changing two-factor method
    if is_changing:
        completion_message = "TWO_FACTOR_CHANGE_METHOD_SUCCESSFUL"
        tf_profile_changed.send(
            app._get_current_object(), user=user, method=primary_method
        )
    # if we are logging in for the first time
    else:
        completion_message = "TWO_FACTOR_LOGIN_SUCCESSFUL"
        tf_code_confirmed.send(
            app._get_current_object(), user=user, method=primary_method
        )
        login_user(user, remember=remember_login)
    tf_clean_session()
    return completion_message


def tf_disable(user):
    """ Disable two factor for user """
    tf_clean_session()
    _datastore.tf_reset(user)
    tf_disabled.send(app._get_current_object(), user=user)


def is_tf_setup(user):
    """ Return True is user account is setup for 2FA. """
    return user.tf_totp_secret and user.tf_primary_method


def tf_login(user, remember=None, primary_authn_via=None):
    """Helper for two-factor authentication login

    This is called only when login/password have already been validated.
    This can be from login, register, confirm, unified sign in, unified magic link.

    The result of this is either sending a 2FA token OR starting setup for new user.
    In either case we do NOT log in user, so we must store some info in session to
    track our state (including what user).
    """

    # on initial login clear any possible state out - this can happen if on same
    # machine log in  more than once since for 2FA you are not authenticated
    # until complete 2FA.
    tf_clean_session()

    session["tf_user_id"] = user.fs_uniquifier
    if "remember":
        session["tf_remember_login"] = remember

    # Set info into form for JSON response
    json_response = {"tf_required": True}
    # if user's two-factor properties are not configured
    if user.tf_primary_method is None or user.tf_totp_secret is None:
        session["tf_state"] = "setup_from_login"
        json_response["tf_state"] = "setup_from_login"
        if not _security._want_json(request):
            return redirect(url_for_security("two_factor_setup"))

    # if user's two-factor properties are configured
    else:
        session["tf_state"] = "ready"
        json_response["tf_state"] = "ready"
        json_response["tf_primary_method"] = user.tf_primary_method

        msg = user.tf_send_security_token(
            method=user.tf_primary_method,
            totp_secret=user.tf_totp_secret,
            phone_number=getattr(user, "tf_phone_number", None),
        )
        if msg:
            # send code didn't work
            if not _security._want_json(request):
                # This is a mess - we are deep down in the login/unified sign in flow.
                do_flash(msg, "error")
                return redirect(url_for_security("login"))
            else:
                payload = json_error_response(errors=msg)
                return _security._render_json(payload, 500, None, None)

        if not _security._want_json(request):
            return redirect(url_for_security("two_factor_token_validation"))

    # JSON response - Fake up a form - doesn't really matter which.
    form = _security.login_form(MultiDict([]))
    form.user = user

    return base_render_json(form, include_user=False, additional=json_response)


def generate_tf_validity_token(fs_uniqifier):
    """Generates a unique token for the specified user.

    :param fs_uniqifier: The fs_uniqifier of a user to whom the token belongs to
    """
    return _security.tf_validity_serializer.dumps(fs_uniqifier)


def tf_validity_token_status(token):
    """Returns the expired status, invalid status, and user of a
    Two-Factor Validity token.
    For example::

        expired, invalid, user = tf_validity_token_status('...')

    :param token: The Two-Factor Validity token
    """
    return check_and_get_token_status(
        token, "tf_validity", get_within_delta("TWO_FACTOR_LOGIN_VALIDITY")
    )


def tf_verify_validility_token(token, fs_uniquifier):
    """Returns the status of the Two-Factor Validity token

    :param token: The Two-Factor Validity token
    :param fs_uniquifier: The ``fs_uniquifier`` of the submitting user.
    """
    if token is None:
        return False

    expired, invalid, uniquifier = tf_validity_token_status(token)

    if expired or invalid or (fs_uniquifier != uniquifier):

        return False

    return True


def tf_set_validity_token_cookie(response, fs_uniquifier=None, remember=False):
    """Sets the Two-Factor validity token for a specific user given that is
    configured and the user selects remember me

    :param response: The response with which to set the set_cookie
    :param fs_uniquifier: The ``fs_uniquifier`` of a user that has succcessfully
                        authenticated and validated with Two-Factor
                        authentication.
    :param remember: Flag specifying if the tf_validity cookie should be set.
    """
    if not config_value("TWO_FACTOR_ALWAYS_VALIDATE") and remember:
        token = generate_tf_validity_token(fs_uniquifier)
        cookie_kwargs = config_value("TWO_FACTOR_VALIDITY_COOKIE")
        max_age = int(get_within_delta("TWO_FACTOR_LOGIN_VALIDITY").total_seconds())
        response.set_cookie(
            "tf_validity", value=token, max_age=max_age, **cookie_kwargs
        )

    return response