1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
|
"""
flask_security.tf_plugin
~~~~~~~~~~~~~~~~~~~~~~~~
Flask-Security Two-Factor Plugin Module
:copyright: (c) 2022-2024 by J. Christopher Wagner (jwag).
:license: MIT, see LICENSE for more details.
TODO:
- add localized callback for select choices.
"""
from __future__ import annotations
import typing as t
from flask import request, redirect, session
from .decorators import unauth_csrf
from .forms import (
build_form_from_request,
get_form_field_xlate,
Form,
RadioField,
SubmitField,
)
from .proxies import _datastore, _security
from .utils import (
_,
base_render_json,
check_and_get_token_status,
config_value as cv,
do_flash,
get_message,
get_within_delta,
get_url,
login_user,
propagate_next,
simple_render_json,
url_for_security,
)
if t.TYPE_CHECKING: # pragma: no cover
import flask
from flask.typing import ResponseValue
from flask import Response
from flask_security import Security, UserMixin
class TwoFactorSelectForm(Form):
which = RadioField(get_form_field_xlate(_("Available Second Factor Methods:")))
submit = SubmitField(get_form_field_xlate(_("Select")))
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@unauth_csrf()
def tf_select() -> ResponseValue:
# Ask user which MFA method they want to use.
# This is used when a user has setup more than one type of 2FA.
form = t.cast(
TwoFactorSelectForm, build_form_from_request("two_factor_select_form")
)
# This endpoint is unauthenticated - make sure we're in a valid state
if not all(k in session for k in ["tf_user_id", "tf_select"]):
# illegal call on this endpoint
tf_clean_session()
return tf_illegal_state(form, cv("TWO_FACTOR_ERROR_VIEW"))
user = _datastore.find_user(fs_uniquifier=session["tf_user_id"])
if not user: # pragma no cover
# hard to imagine - someone deletes the user while they are logging in.
tf_clean_session()
return tf_illegal_state(form, cv("TWO_FACTOR_ERROR_VIEW"))
setup_methods = _security.two_factor_plugins.get_setup_tf_methods(user)
form.which.choices = setup_methods # type: ignore[assignment]
if form.validate_on_submit():
response = None
tf_impl = _security.two_factor_plugins.method_to_impl(user, form.which.data)
if tf_impl:
json_payload = {"tf_required": True}
response = tf_impl.tf_login(
user, json_payload, next_loc=propagate_next(request.url, None)
)
if not response: # pragma no cover
# This really can't happen unless between the time the started logging in
# and now, they deleted a second factor (which they would have to do
# in another window).
tf_clean_session()
return tf_illegal_state(form, cv("TWO_FACTOR_ERROR_VIEW"))
return response
if _security._want_json(request):
payload = {"tf_select": True, "tf_setup_methods": setup_methods}
return base_render_json(form, include_user=False, additional=payload)
return _security.render_template(
cv("TWO_FACTOR_SELECT_TEMPLATE"),
two_factor_select_form=form,
**_security._run_ctx_processor("tf_select"),
)
class TfPluginBase: # pragma no cover
def __init__(self, app: flask.Flask):
pass
def create_blueprint(
self, app: flask.Flask, bp: flask.Blueprint, state: Security
) -> None:
raise NotImplementedError
def get_setup_methods(self, user: UserMixin) -> list[str]:
"""
Return a list of methods that ``user`` has setup for this second factor
"""
raise NotImplementedError
def tf_login(
self, user: UserMixin, json_payload: dict[str, t.Any], next_loc: str | None
) -> ResponseValue:
"""
Called from first/primary authenticated views if the user successfully
authenticated, and required a second method of authentication.
This method returns the necessary information for the user UI to continue.
For forms, this is usually a redirect to a secondary sign in form. For JSON
it is just a payload that describes what the user has to do next.
"""
raise NotImplementedError
class TfPlugin:
"""
Two-Factor plugin support.
Enables multiple independent two-factor implementations to be configured for a given
app. See TfPluginBase for what a new implementation must provide.
"""
def __init__(self) -> None:
self._tf_impls: dict[str, TfPluginBase] = {}
def register_tf_impl(
# N.B. all methods must be unique across all implementations.
self,
app: flask.Flask,
name: str,
impl: t.Type[TfPluginBase],
) -> None:
self._tf_impls[name] = impl(app)
def create_blueprint(
self, app: flask.Flask, bp: flask.Blueprint, state: Security
) -> None:
if state.support_mfa:
for impl in self._tf_impls.values():
impl.create_blueprint(app, bp, state)
# Add our route for selecting between multiple active two-factor
# mechanisms.
bp.route(
cv("TWO_FACTOR_SELECT_URL", app),
methods=["GET", "POST"],
endpoint="tf_select",
)(tf_select)
def method_to_impl(self, user: UserMixin, method: str) -> TfPluginBase | None:
# reverse map a method to the implementation.
# N.B. again - requires that methods be unique across all implementations.
# There is a small window that a previously setup method was removed.
for impl in self._tf_impls.values():
setup_methods = impl.get_setup_methods(user)
if method in setup_methods:
return impl
return None # pragma no cover
def get_setup_tf_methods(self, user: UserMixin) -> list[str]:
# Return list of methods that user has setup
methods = []
for impl in self._tf_impls.values():
methods.extend(impl.get_setup_methods(user))
return methods
def tf_enter(
self,
user: UserMixin,
remember_me: bool | None,
primary_authn_via: str,
next_loc: str | None,
) -> ResponseValue | None:
"""Check if two-factor is required and if so, start the process.
Must be called in a request context.
remember_me controls 2 cookies - the remember_me cookie and the tf_validity
cookie. We use the session to hold the fact that the user requested 'remember'
across the second factor.
"""
json_payload: dict[str, t.Any]
if _security.support_mfa:
tf_setup_methods = self.get_setup_tf_methods(user)
if cv("TWO_FACTOR_REQUIRED") or len(tf_setup_methods) > 0:
tf_fresh = tf_verify_validity_token(user.fs_uniquifier)
if cv("TWO_FACTOR_ALWAYS_VALIDATE") or not tf_fresh:
# Clean out any potential old session info - in case of previous
# aborted 2FA attempt.
tf_clean_session()
json_payload = {"tf_required": True}
if remember_me:
session["tf_remember_login"] = remember_me
session["tf_user_id"] = user.fs_uniquifier
# A backwards compat hack - the original twofactor could be setup
# as part of initial login.
if len(tf_setup_methods) == 0:
# only initial two-factor implementation supports this
return self._tf_impls["code"].tf_login(
user, json_payload, next_loc
)
elif len(tf_setup_methods) == 1:
# method_to_impl can't return None here since we just
# got the methods up above.
impl = t.cast(
TfPluginBase,
self.method_to_impl(user, tf_setup_methods[0]),
)
return impl.tf_login(user, json_payload, next_loc)
else:
session["tf_select"] = True
if not _security._want_json(request):
values = dict(next=next_loc) if next_loc else dict()
return redirect(url_for_security("tf_select", **values))
# Let's force app to go through tf-select just in case we want
# to do further validation... However, provide the choices
# so they can just do a POST
json_payload.update(
{
"tf_select": True,
"tf_setup_methods": tf_setup_methods,
}
)
return simple_render_json(json_payload)
return None
def tf_complete(self, user: UserMixin, dologin: bool) -> str | None:
remember = session.pop("tf_remember_login", None)
if dologin:
login_user(user, remember=remember)
tf_clean_session()
token = None
# return a token to avoid future two-factor prompts (for a period of time)
if not cv("TWO_FACTOR_ALWAYS_VALIDATE") and remember:
token = generate_tf_validity_token(user.fs_uniquifier)
return token
def generate_tf_validity_token(fs_uniquifier):
"""Generates a unique token for the specified user.
:param fs_uniquifier: The fs_uniquifier of a user to whom the token belongs to
"""
return _security.tf_validity_serializer.dumps(fs_uniquifier)
def tf_validity_token_status(token):
"""Returns the expired status, invalid status, and user of a
Two-Factor Validity token.
For example::
expired, invalid, user = tf_validity_token_status('...')
:param token: The Two-Factor Validity token
"""
return check_and_get_token_status(
token, "tf_validity", get_within_delta("TWO_FACTOR_LOGIN_VALIDITY")
)
def tf_verify_validity_token(fs_uniquifier: str) -> bool:
"""Returns the status of the Two-Factor Validity token based on the current
request.
:param fs_uniquifier: The ``fs_uniquifier`` of the submitting user.
"""
token = request.cookies.get("tf_validity", default=None)
if token is None:
return False
expired, invalid, uniquifier = tf_validity_token_status(token)
if expired or invalid or (fs_uniquifier != uniquifier):
return False
return True
def tf_set_validity_token_cookie(response: Response, token: str) -> Response:
"""Sets the Two-Factor validity token for a specific user given that is
configured and the user selects remember me
:param response: The response with which to set the set_cookie
:param token: validity token
"""
cookie_kwargs = cv("TWO_FACTOR_VALIDITY_COOKIE")
max_age = int(get_within_delta("TWO_FACTOR_LOGIN_VALIDITY").total_seconds())
response.set_cookie("tf_validity", value=token, max_age=max_age, **cookie_kwargs)
# This is likely overkill since so far we only return this on a POST which is
# unlikely to be cached.
response.vary.add("Cookie")
return response
def tf_check_state(allowed_states: list[str]) -> UserMixin | None:
if (
not all(k in session for k in ["tf_user_id", "tf_state"])
or session["tf_state"] not in allowed_states
):
tf_clean_session()
return None
user = _datastore.find_user(fs_uniquifier=session["tf_user_id"])
if not user:
tf_clean_session()
return user
def tf_illegal_state(form, redirect_to):
m, c = get_message("TWO_FACTOR_PERMISSION_DENIED")
if not _security._want_json(request):
do_flash(m, c)
return redirect(get_url(redirect_to))
else:
form.form_errors.append(m)
return base_render_json(form, include_user=False)
def tf_clean_session():
"""
Clean out ALL stuff stored in session (e.g. on logout or restart of a session)
"""
if cv("TWO_FACTOR"):
for k in [
"tf_state",
"tf_user_id",
"tf_primary_method",
"tf_remember_login",
"tf_totp_secret",
"tf_select",
]:
session.pop(k, None)
|