"""
test_two_factor
~~~~~~~~~~~~~~~~~

two_factor tests

:copyright: (c) 2019-2025 by J. Christopher Wagner (jwag).
:license: MIT, see LICENSE for more details.
"""

from datetime import date, timedelta
import re

from freezegun import freeze_time
import markupsafe
from passlib.totp import TOTP
import pytest
from flask_principal import identity_changed
from flask_security import (
    SQLAlchemyUserDatastore,
    SmsSenderFactory,
    reset_password_instructions_sent,
    tf_profile_changed,
    uia_email_mapper,
)
from tests.test_utils import (
    SmsBadSender,
    SmsTestSender,
    authenticate,
    capture_flashes,
    capture_send_code_requests,
    check_location,
    check_xlation,
    get_form_action,
    get_form_input_value,
    get_session,
    is_authenticated,
    json_authenticate,
    logout,
)

pytestmark = pytest.mark.two_factor()


SmsSenderFactory.senders["test"] = SmsTestSender
SmsSenderFactory.senders["bad"] = SmsBadSender


def tf_authenticate(app, client, json=False, validate=True, remember=False):
    """Login/Authenticate using two factor.
    This is the equivalent of utils:authenticate
    """
    prev_sms = app.config["SECURITY_SMS_SERVICE"]
    app.config["SECURITY_SMS_SERVICE"] = "test"
    sms_sender = SmsSenderFactory.createSender("test")
    json_data = dict(email="gal@lp.com", password="password", remember=remember)
    response = client.post(
        "/login", json=json_data, headers={"Content-Type": "application/json"}
    )

    assert b'"code": 200' in response.data
    app.config["SECURITY_SMS_SERVICE"] = prev_sms

    if validate:
        code = sms_sender.messages[0].split()[-1]
        if json:
            response = client.post(
                "/tf-validate",
                json=dict(code=code),
            )
            assert response.status_code == 200
        else:
            response = client.post(
                "/tf-validate", data=dict(code=code), follow_redirects=True
            )
            assert response.status_code == 200


def tf_in_session(session):
    return any(
        k in session
        for k in [
            "tf_state",
            "tf_primary_method",
            "tf_user_id",
            "tf_remember_login",
            "tf_totp_secret",
            "tf_select",
        ]
    )


@pytest.mark.settings(two_factor_always_validate=False)
def test_always_validate(app, client, get_message):
    tf_authenticate(app, client, remember=True)
    assert client.get_cookie("tf_validity")

    logout(client)

    data = dict(email="gal@lp.com", password="password")
    response = client.post("/login", data=data, follow_redirects=True)
    assert b"Welcome gal@lp.com" in response.data
    assert response.status_code == 200

    logout(client)
    data = dict(email="gal2@lp.com", password="password")
    response = client.post("/login", data=data, follow_redirects=True)
    assert b"Please enter your authentication code" in response.data

    # make sure the cookie doesn't affect the JSON request
    client.delete_cookie("tf_validity")
    # Test JSON (this authenticates gal@lp.com)
    tf_authenticate(app, client, json=True, remember=True)
    logout(client)

    data = dict(email="gal@lp.com", password="password")
    response = client.post(
        "/login",
        json=data,
        follow_redirects=True,
    )
    assert response.status_code == 200
    # verify logged in
    is_authenticated(client, get_message)
    logout(client)

    data["email"] = "gal2@lp.com"
    response = client.post(
        "/login",
        json=data,
        follow_redirects=True,
    )
    assert response.status_code == 200
    assert response.json["response"]["tf_required"]
    assert response.json["response"]["tf_state"] == "ready"
    assert response.json["response"]["tf_primary_method"] == "authenticator"


@pytest.mark.settings(two_factor_always_validate=False)
def test_do_not_remember_tf_validity(app, client):
    tf_authenticate(app, client)
    logout(client)

    data = dict(email="gal@lp.com", password="password")
    response = client.post("/login", data=data, follow_redirects=True)
    assert b"Please enter your authentication code" in response.data

    # Test JSON
    tf_authenticate(app, client, json=True)
    logout(client)
    assert not client.get_cookie("tf_validity")

    data = dict(email="gal@lp.com", password="password")
    response = client.post(
        "/login",
        json=data,
        follow_redirects=True,
        headers={"Content-Type": "application/json"},
    )

    assert response.status_code == 200
    assert response.json["response"]["tf_required"]
    assert response.json["response"]["tf_state"] == "ready"
    assert response.json["response"]["tf_primary_method"] == "sms"


@pytest.mark.settings(
    two_factor_always_validate=False, two_factor_login_validity="-1 minutes"
)
def test_tf_expired_cookie(app, client):
    tf_authenticate(app, client, remember=True)
    logout(client)

    data = dict(email="gal@lp.com", password="password")
    response = client.post("/login", data=data, follow_redirects=True)

    assert b"Please enter your authentication code" in response.data

    # Test JSON
    tf_authenticate(app, client, json=True, remember=True)
    logout(client)
    data = dict(email="gal@lp.com", password="password")
    response = client.post(
        "/login",
        json=data,
        follow_redirects=True,
        headers={"Content-Type": "application/json"},
    )
    assert response.status_code == 200
    assert response.json["response"]["tf_required"]
    assert response.json["response"]["tf_state"] == "ready"
    assert response.json["response"]["tf_primary_method"] == "sms"


@pytest.mark.settings(two_factor_always_validate=False)
def test_change_uniquifier_invalidates_cookie(app, client):
    tf_authenticate(app, client, remember=True)
    logout(client)
    with app.app_context():
        user = app.security.datastore.find_user(email="gal@lp.com")
        app.security.datastore.set_uniquifier(user)
        app.security.datastore.commit()

    data = dict(email="gal@lp.com", password="password")
    response = client.post("/login", data=data, follow_redirects=True)

    assert b"Please enter your authentication code" in response.data

    client.delete_cookie("tf_validity")
    # Test JSON
    tf_authenticate(app, client, json=True, remember=True)
    logout(client)
    with app.app_context():
        user = app.security.datastore.find_user(email="gal@lp.com")
        app.security.datastore.set_uniquifier(user)
        app.security.datastore.commit()
    data = dict(email="gal@lp.com", password="password")
    response = client.post(
        "/login",
        json=data,
        follow_redirects=True,
        headers={"Content-Type": "application/json"},
    )
    assert response.status_code == 200
    assert response.json["response"]["tf_required"]
    assert response.json["response"]["tf_state"] == "ready"
    assert response.json["response"]["tf_primary_method"] == "sms"


@pytest.mark.settings(two_factor_always_validate=False, two_factor_required=True)
def test_tf_reset_invalidates_cookie(app, client):
    tf_authenticate(app, client, remember=True)
    logout(client)
    with app.app_context():
        user = app.security.datastore.find_user(email="gal@lp.com")
        app.security.datastore.reset_user_access(user)
        app.security.datastore.commit()

    data = dict(email="gal@lp.com", password="password")
    response = client.post("/login", data=data, follow_redirects=True)

    assert b"Two-Factor authentication adds an extra layer of security" in response.data

    client.delete_cookie("tf_validity")
    # Test JSON
    tf_authenticate(app, client, json=True, remember=True, validate=False)
    logout(client)
    with app.app_context():
        user = app.security.datastore.find_user(email="gal@lp.com")
        app.security.datastore.reset_user_access(user)
        app.security.datastore.commit()
    data = dict(email="gal@lp.com", password="password")
    response = client.post(
        "/login",
        json=data,
        follow_redirects=True,
        headers={"Content-Type": "application/json"},
    )
    assert response.status_code == 200
    assert response.json["response"]["tf_required"]
    assert response.json["response"]["tf_state"] == "setup_from_login"


@pytest.mark.settings(two_factor_required=True)
def test_two_factor_two_factor_setup_anonymous(app, client, get_message):
    # trying to pick method without doing earlier stage
    data = dict(setup="email")

    with capture_flashes() as flashes:
        response = client.post("/tf-setup", data=data)
        assert response.status_code == 302
    assert flashes[0]["category"] == "error"
    assert flashes[0]["message"].encode("utf-8") == get_message(
        "TWO_FACTOR_PERMISSION_DENIED"
    )


@pytest.mark.settings(two_factor_required=True, url_prefix="/api")
def test_two_factor_illegal_state(app, client, get_message):
    # trying to pick method without doing earlier stage
    data = dict(setup="email")

    with capture_flashes() as flashes:
        response = client.post("/api/tf-setup", data=data)
        assert response.status_code == 302
        assert "/api/login" in response.location
    assert flashes[0]["category"] == "error"
    assert flashes[0]["message"].encode("utf-8") == get_message(
        "TWO_FACTOR_PERMISSION_DENIED"
    )

    # try validate code
    response = client.post(
        "/api/tf-validate", data=dict(code=b"333"), follow_redirects=False
    )
    assert response.status_code == 302
    assert "/api/login" in response.location

    # try rescue
    response = client.post(
        "/api/tf-rescue", data=dict(help_setup="lost_device"), follow_redirects=False
    )
    assert response.status_code == 302
    assert "/api/login" in response.location


@pytest.mark.settings(two_factor_required=True)
def test_two_factor_flag(app, clients, get_message):
    # trying to verify code without going through two-factor
    # first login function
    client = clients
    wrong_code = b"000000"
    response = client.post(
        "/tf-validate", data=dict(code=wrong_code), follow_redirects=True
    )

    message = b"You currently do not have permissions to access this page"
    assert message in response.data

    # Test login using invalid email
    data = dict(email="nobody@lp.com", password="password")
    response = client.post("/login", data=data, follow_redirects=True)
    assert b"Specified user does not exist" in response.data
    response = client.post(
        "/login",
        json=data,
        headers={"Content-Type": "application/json"},
        follow_redirects=True,
    )
    assert b"Specified user does not exist" in response.data

    # Test login using valid email and invalid password
    data = dict(email="gal@lp.com", password="wrong_pass")
    response = client.post("/login", data=data, follow_redirects=True)
    assert b"Invalid password" in response.data
    response = client.post(
        "/login",
        json=data,
        headers={"Content-Type": "application/json"},
        follow_redirects=True,
    )
    assert b"Invalid password" in response.data

    # Test two-factor authentication first login
    data = dict(email="matt@lp.com", password="password")
    response = client.post("/login", data=data, follow_redirects=True)
    message = b"Two-Factor authentication adds an extra layer of security"
    assert message in response.data
    response = client.post(
        "/tf-setup", data=dict(setup="not_a_method"), follow_redirects=True
    )
    assert b"Marked method is not valid" in response.data
    with client.session_transaction() as session:
        assert session["tf_state"] == "setup_from_login"

    # try non-existing setup on setup page (using json)
    data = dict(setup="not_a_method")
    response = client.post(
        "/tf-setup",
        json=data,
        headers={"Content-Type": "application/json"},
        follow_redirects=True,
    )
    assert response.status_code == 400
    assert (
        response.json["response"]["field_errors"]["setup"][0]
        == "Marked method is not valid"
    )

    data = dict(setup="email")
    response = client.post(
        "/tf-setup",
        json=data,
        headers={"Content-Type": "application/json"},
        follow_redirects=True,
    )

    # Test for sms in process of valid login
    sms_sender = SmsSenderFactory.createSender("test")
    data = dict(email="gal@lp.com", password="password")
    response = client.post(
        "/login",
        json=data,
        headers={"Content-Type": "application/json"},
        follow_redirects=True,
    )
    assert b'"code": 200' in response.data
    assert sms_sender.get_count() == 1
    session = get_session(response)
    assert session["tf_state"] == "ready"

    code = sms_sender.messages[0].split()[-1]
    # submit bad token to two_factor_token_validation
    response = client.post("/tf-validate", data=dict(code=wrong_code))
    assert get_message("TWO_FACTOR_INVALID_TOKEN") in response.data

    # submit right token and show appropriate response
    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert get_message("TWO_FACTOR_LOGIN_SUCCESSFUL") in response.data

    # Upon completion, session cookie shouldn't have any two factor stuff in it.
    assert not tf_in_session(get_session(response))

    # Test change two_factor view from sms to mail
    setup_data = dict(setup="email")
    response = client.post("/tf-setup", data=setup_data, follow_redirects=True)
    msg = b"Enter code to complete setup"
    assert msg in response.data

    # Fetch token validate form
    response = client.get("/tf-validate")
    assert response.status_code == 200
    # make sure two_factor_verify_code_form is set
    assert b'name="code"' in response.data

    code = app.mail.outbox[1].body.split()[-1]
    # submit right token and show appropriate response
    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert b"You successfully changed your two-factor method" in response.data

    # Test change two_factor password confirmation view to authenticator
    # Setup authenticator
    setup_data = dict(setup="authenticator")
    response = client.post("/tf-setup", data=setup_data, follow_redirects=True)
    assert b"Open an authenticator app on your device" in response.data
    # verify png QRcode is present
    assert b"data:image/svg+xml;base64," in response.data

    # parse out key
    rd = response.data.decode("utf-8")
    matcher = re.match(r".*((?:\S{4}-){7}\S{4}).*", rd, re.DOTALL)
    totp_secret = matcher.group(1)

    # Generate token from passed totp_secret and confirm setup
    totp = TOTP(totp_secret)
    code = totp.generate().token
    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert b"You successfully changed your two-factor method" in response.data

    logout(client)

    # Test login with remember_token
    assert not client.get_cookie("remember_token")
    data = dict(email="gal@lp.com", password="password", remember=True)
    response = client.post(
        "/login",
        json=data,
        headers={"Content-Type": "application/json"},
        follow_redirects=True,
    )

    # Generate token from passed totp_secret
    code = totp.generate().token
    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert get_message("TWO_FACTOR_LOGIN_SUCCESSFUL") in response.data

    # Verify that the remember token is properly set
    assert client.get_cookie("remember_token")

    response = logout(client)
    # Verify that logout clears session info
    assert not tf_in_session(get_session(response))

    # Test two-factor authentication first login
    data = dict(email="matt@lp.com", password="password")
    response = client.post("/login", data=data, follow_redirects=True)
    message = b"Two-Factor authentication adds an extra layer of security"
    assert message in response.data

    # check availability of qrcode when this option is not picked
    assert b"data:image/png;base64," not in response.data

    # check availability of qrcode page when this option is picked
    setup_data = dict(setup="authenticator")
    response = client.post("/tf-setup", data=setup_data, follow_redirects=True)
    assert b"Open an authenticator app on your device" in response.data
    assert b"data:image/svg+xml;base64," in response.data

    # check appearance of setup page when sms picked and phone number entered
    sms_sender = SmsSenderFactory.createSender("test")
    data = dict(setup="sms", phone="+442083661177")
    response = client.post("/tf-setup", data=data, follow_redirects=True)
    assert b"Enter code to complete setup" in response.data
    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]

    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert get_message("TWO_FACTOR_LOGIN_SUCCESSFUL") in response.data
    assert not tf_in_session(get_session(response))

    logout(client)

    # check when two_factor_rescue function should not appear
    rescue_data_json = dict(help_setup="lost_device")
    response = client.post(
        "/tf-rescue",
        json=rescue_data_json,
        headers={"Content-Type": "application/json"},
    )
    assert b'"code": 400' in response.data

    # check when two_factor_rescue function should appear
    data = dict(email="gal2@lp.com", password="password")
    response = client.post("/login", data=data, follow_redirects=True)
    assert b"Please enter your authentication code" in response.data
    rescue_data = dict(help_setup="email")
    response = client.post("/tf-rescue", data=rescue_data, follow_redirects=True)
    message = b"The code for authentication was sent to your email address"
    assert message in response.data
    rescue_data = dict(help_setup="help")
    response = client.post("/tf-rescue", data=rescue_data, follow_redirects=True)
    message = b"An email was sent to us in order to reset your application account"
    assert message in response.data


@pytest.mark.settings(two_factor_rescue_email=False)
def test_no_rescue_email(app, client):
    headers = {"Accept": "application/json", "Content-Type": "application/json"}

    response = client.post(
        "/login", json=dict(email="gal2@lp.com", password="password")
    )
    assert response.json["response"]["tf_required"]

    response = client.get("/tf-rescue", headers=headers)
    options = response.json["response"]["recovery_options"]
    assert len(options.keys()) == 1
    assert "help" in options.keys()

    # make sure that even if post using email - we get an error
    response = client.post("/tf-rescue", json=dict(help_setup="email"))
    assert response.status_code == 400
    assert (
        response.json["response"]["field_errors"]["help_setup"][0]
        == "Not a valid choice."
    )


@pytest.mark.settings(two_factor_required=True)
def test_setup_bad_phone(app, client, get_message):
    data = dict(email="matt@lp.com", password="password")
    response = client.post("/login", data=data, follow_redirects=True)
    message = b"Two-Factor authentication adds an extra layer of security"
    assert message in response.data

    sms_sender = SmsSenderFactory.createSender("test")
    data = dict(setup="sms", phone="555-1212")
    response = client.post("/tf-setup", data=data, follow_redirects=True)
    assert b"Phone number not valid" in response.data
    assert sms_sender.get_count() == 0

    # require phone number with SMS
    data = dict(setup="sms")
    response = client.post("/tf-setup", data=data, follow_redirects=True)
    assert b"Phone number not valid" in response.data
    assert sms_sender.get_count() == 0

    # Now setup good phone
    response = client.post(
        "/tf-setup", data=dict(setup="sms", phone="650-555-1212"), follow_redirects=True
    )
    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]
    # shouldn't get authenticator stuff when setting up SMS
    assert b"data:image/png;base64," not in response.data

    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert get_message("TWO_FACTOR_LOGIN_SUCCESSFUL") in response.data
    assert not tf_in_session(get_session(response))

    headers = {"Accept": "application/json", "Content-Type": "application/json"}
    response = client.get("/tf-setup", headers=headers)
    # N.B. right now for tfa - we don't canonicalize phone number (since user
    # never has to type it in).
    assert response.json["response"]["tf_phone_number"] == "650-555-1212"


@pytest.mark.settings(two_factor_required=True)
def test_json(app, client):
    """
    Test login/setup using JSON.
    """
    headers = {"Accept": "application/json", "Content-Type": "application/json"}

    # Login with someone already setup.
    sms_sender = SmsSenderFactory.createSender("test")
    data = dict(email="gal@lp.com", password="password")
    response = client.post("/login", json=data, headers=headers)
    assert response.status_code == 200
    assert response.json["response"]["tf_required"]
    assert response.json["response"]["tf_state"] == "ready"
    assert response.json["response"]["tf_primary_method"] == "sms"

    # Verify SMS sent
    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]
    response = client.post("/tf-validate", json=dict(code=code))
    assert response.status_code == 200
    # verify logged in
    response = client.get("/profile", follow_redirects=False)
    assert response.status_code == 200
    logout(client)

    # Test that user not yet setup for 2FA gets correct response.
    data = dict(email="matt@lp.com", password="password")
    response = client.post("/login", json=data)
    assert response.json["response"]["tf_required"]
    assert response.json["response"]["tf_state"] == "setup_from_login"

    # Start setup process.
    response = client.get("/tf-setup", headers=headers)
    assert response.json["response"]["tf_required"]
    assert "sms" in response.json["response"]["tf_available_methods"]

    # Now try to setup
    data = dict(setup="sms", phone="+442083661177")
    response = client.post("/tf-setup", json=data)
    assert response.status_code == 200
    assert response.json["response"]["tf_state"] == "validating_profile"
    assert response.json["response"]["tf_primary_method"] == "sms"
    code = sms_sender.messages[0].split()[-1]
    response = client.post("/tf-validate", json=dict(code=code), headers=headers)
    assert response.status_code == 200
    assert "csrf_token" in response.json["response"]
    assert response.json["response"]["user"]["email"] == "matt@lp.com"

    logout(client)

    # Verify tf is now setup and can directly get code
    data = dict(email="matt@lp.com", password="password")
    response = client.post("/login", json=data)
    assert response.json["response"]["tf_required"]
    assert response.json["response"]["tf_state"] == "ready"

    # send bad code
    response = client.post("/tf-validate", json=dict(code="whatsup"))
    assert response.status_code == 400
    assert response.json["response"]["field_errors"]["code"][0] == "Invalid code"
    assert response.json["response"]["errors"][0] == "Invalid code"

    # Do a GET - should get recovery options
    response = client.get("/tf-validate", headers=headers)
    options = response.json["response"]["recovery_options"]
    assert "email" in options.keys()
    assert "/tf-rescue" in options["email"]

    # now send correct code
    code = sms_sender.messages[0].split()[-1]
    response = client.post("/tf-validate", json=dict(code=code))
    assert response.status_code == 200
    # verify logged in
    response = client.get("/profile", follow_redirects=False)
    assert response.status_code == 200

    # tf-setup should provide existing info
    response = client.get("/tf-setup", headers=headers)
    assert response.json["response"]["tf_required"]
    assert "sms" in response.json["response"]["tf_available_methods"]
    assert "disable" not in response.json["response"]["tf_available_methods"]
    assert response.json["response"]["tf_primary_method"] == "sms"
    assert response.json["response"]["tf_phone_number"] == "+442083661177"
    assert not tf_in_session(get_session(response))


@pytest.mark.settings(two_factor_rescue_mail="helpme@myapp.com")
def test_rescue_json(app, client):
    # it's an error if not primary authenticated
    rescue_data_json = dict(help_setup="help")
    response = client.post(
        "/tf-rescue",
        json=rescue_data_json,
    )
    assert response.status_code == 400

    # check when two_factor_rescue function should appear
    data = dict(email="gal2@lp.com", password="password")
    response = client.post("/login", json=data)
    assert response.json["response"]["tf_required"]

    rescue_data = dict(help_setup="email")
    response = client.post("/tf-rescue", json=rescue_data)
    assert response.status_code == 200
    outbox = app.mail.outbox

    assert outbox[0].to == ["gal2@lp.com"]
    assert outbox[0].from_email == "no-reply@localhost"
    assert outbox[0].subject == "Two-Factor Login"
    matcher = re.match(r".*code: ([0-9]+).*", outbox[0].body, re.IGNORECASE | re.DOTALL)
    response = client.post("/tf-validate", json=dict(code=matcher.group(1)))
    assert response.status_code == 200
    logout(client)

    # Try rescue with no email (should send email to admin)
    client.post("/login", json=data)
    rescue_data = dict(help_setup="help")
    response = client.post("/tf-rescue", json=rescue_data)
    assert response.status_code == 200
    outbox = app.mail.outbox

    assert outbox[1].to == ["helpme@myapp.com"]
    assert outbox[1].from_email == "no-reply@localhost"
    assert outbox[1].subject == "Two-Factor Rescue"
    assert "gal2@lp.com" in outbox[1].body


@pytest.mark.settings(two_factor_required=True)
def test_json_auth_token(app, client):
    """
    Test getting auth_token with two-factor
    """
    headers = {"Accept": "application/json", "Content-Type": "application/json"}

    # Login with someone already setup.
    sms_sender = SmsSenderFactory.createSender("test")
    data = dict(email="gal@lp.com", password="password")
    response = client.post("/login", json=data, headers=headers)
    assert response.status_code == 200
    assert response.json["response"]["tf_required"]

    # Verify SMS sent
    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]
    response = client.post("/tf-validate?include_auth_token", json=dict(code=code))
    assert response.status_code == 200
    token = response.json["response"]["user"]["authentication_token"]
    headers = {"Authentication-Token": token}
    # make sure can access restricted page
    response = client.get("/token", headers=headers)
    assert b"Token Authentication" in response.data


@pytest.mark.settings(two_factor_required=True)
def test_no_opt_out(app, client, get_message):
    # Test if 2FA required, can't opt-out.
    sms_sender = SmsSenderFactory.createSender("test")
    response = client.post(
        "/login",
        data=dict(email="gal@lp.com", password="password"),
        follow_redirects=True,
    )
    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]

    # submit right token and show appropriate response
    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert get_message("TWO_FACTOR_LOGIN_SUCCESSFUL") in response.data

    response = client.get("/tf-setup", follow_redirects=True)
    assert b"Disable two factor" not in response.data
    assert b"Currently setup two-factor method: SMS" in response.data

    # Try to opt-out
    data = dict(setup="disable")
    response = client.post("/tf-setup", data=data, follow_redirects=True)
    assert response.status_code == 200
    assert b"Marked method is not valid" in response.data


@pytest.mark.settings(
    two_factor_setup_url="/custom-setup", two_factor_rescue_url="/custom-rescue"
)
def test_custom_urls(client):
    response = client.get("/tf-setup")
    assert response.status_code == 404
    response = client.get("/custom-setup")
    assert response.status_code == 302
    response = client.get("/custom-rescue")
    assert response.status_code == 302


def test_evil_validate(app, client):
    """
    Test logged in, and randomly try to validate a token
    """
    signalled_identity = []

    @identity_changed.connect_via(app)
    def on_identity_changed(app, identity):
        signalled_identity.append(identity.id)

    response = authenticate(client, "jill@lp.com")
    session = get_session(response)
    assert "tf_state" not in session
    with app.app_context():
        user = app.security.datastore.find_user(email="jill@lp.com")
        assert signalled_identity[0] == user.fs_uniquifier
    del signalled_identity[:]

    # try to validate
    response = client.post("/tf-validate", data=dict(code="?"), follow_redirects=True)
    # This should log us out since it thinks we are evil
    assert not signalled_identity[0]
    del signalled_identity[:]


def test_opt_in(app, client, get_message):
    """
    Test entire lifecycle of user not having 2FA - setting it up, then deciding
    to turn it back off
    All using forms based API
    """

    signalled_identity = []

    @identity_changed.connect_via(app)
    def on_identity_changed(app, identity):
        signalled_identity.append(identity.id)

    response = authenticate(client, "jill@lp.com")
    session = get_session(response)
    assert "tf_state" not in session
    with app.app_context():
        user = app.security.datastore.find_user(email="jill@lp.com")
        assert signalled_identity[0] == user.fs_uniquifier
    del signalled_identity[:]

    # opt-in for SMS 2FA
    sms_sender = SmsSenderFactory.createSender("test")
    data = dict(setup="sms", phone="+442083661177")
    response = client.post("/tf-setup", data=data, follow_redirects=True)
    assert b"Enter code to complete setup" in response.data
    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]

    # Validate token - this should complete 2FA setup
    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert b"You successfully changed" in response.data
    assert check_location(app, response.history[0].location, "/tf-setup")

    # Upon completion, session cookie shouldnt have any two factor stuff in it.
    session = get_session(response)
    assert not tf_in_session(session)

    # Log out
    logout(client)
    assert not signalled_identity[0]
    del signalled_identity[:]

    # Login now should require 2FA with sms
    sms_sender = SmsSenderFactory.createSender("test")
    response = authenticate(client, "jill@lp.com")
    session = get_session(response)
    assert session["tf_state"] == "ready"
    assert len(signalled_identity) == 0

    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]
    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert get_message("TWO_FACTOR_LOGIN_SUCCESSFUL") in response.data
    # Verify now logged in
    with app.app_context():
        user = app.security.datastore.find_user(email="jill@lp.com")
        assert signalled_identity[0] == user.fs_uniquifier
    del signalled_identity[:]

    # Now opt back out.
    data = dict(setup="disable")
    response = client.post("/tf-setup", data=data, follow_redirects=True)
    assert b"You successfully disabled two-factor authorization." in response.data

    # Log out
    logout(client)
    assert not signalled_identity[0]
    del signalled_identity[:]

    # Should be able to log in with just user/pass
    response = authenticate(client, "jill@lp.com")
    session = get_session(response)
    assert "tf_state" not in session
    with app.app_context():
        user = app.security.datastore.find_user(email="jill@lp.com")
        assert signalled_identity[0] == user.fs_uniquifier


def test_opt_in_nc(app, client_nc, get_message):
    """
    Test tf-setup without cookies
    """
    response = json_authenticate(client_nc, "jill@lp.com")
    assert response.status_code == 200
    token = response.json["response"]["user"]["authentication_token"]
    headers = {"Authentication-Token": token, "Accept": "application/json"}

    sms_sender = SmsSenderFactory.createSender("test")
    data = dict(setup="sms", phone="+442083661177")
    response = client_nc.post("/tf-setup", json=data, headers=headers)
    assert response.status_code == 200
    state_token = response.json["response"]["tf_state_token"]
    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]

    # send bad token
    response = client_nc.post(
        "/tf-setup/not-a-token", json=dict(code=code), headers=headers
    )
    assert response.status_code == 400
    assert response.json["response"]["errors"][0].encode("utf-8") == get_message(
        "API_ERROR"
    )

    # send bad code
    response = client_nc.post(
        f"/tf-setup/{state_token}", json=dict(code=12345), headers=headers
    )
    assert response.status_code == 400
    assert response.json["response"]["errors"][0].encode("utf-8") == get_message(
        "TWO_FACTOR_INVALID_TOKEN"
    )

    # Validate token - this should complete 2FA setup
    @tf_profile_changed.connect_via(app)
    def pc(sender, user, method, **kwargs):
        assert method == "sms"
        assert user.tf_phone_number == "+442083661177"

    response = client_nc.post(
        f"/tf-setup/{state_token}", json=dict(code=code), headers=headers
    )
    assert response.status_code == 200

    response = client_nc.get("/tf-setup", headers=headers)
    assert response.json["response"]["tf_method"] == "sms"
    assert response.json["response"]["tf_phone_number"] == "+442083661177"


def test_opt_in_nc_expired(app, client_nc, get_message):
    """
    Test tf-setup without cookies - expired token
    """
    with freeze_time(
        date.today() + timedelta(days=-1)
    ):  # older than TWO_FACTOR_SETUP_WITHIN
        response = json_authenticate(client_nc, "jill@lp.com")
        assert response.status_code == 200
        token = response.json["response"]["user"]["authentication_token"]
        headers = {"Authentication-Token": token, "Accept": "application/json"}

        sms_sender = SmsSenderFactory.createSender("test")
        data = dict(setup="sms", phone="+442083661177")
        response = client_nc.post("/tf-setup", json=data, headers=headers)
        assert response.status_code == 200
        state_token = response.json["response"]["tf_state_token"]
    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]

    # Validate token - this should complete 2FA setup
    response = client_nc.post(
        f"/tf-setup/{state_token}", json=dict(code=code), headers=headers
    )
    assert response.status_code == 400
    assert response.json["response"]["errors"][0].encode("utf-8") == get_message(
        "TWO_FACTOR_SETUP_EXPIRED",
        within=app.config["SECURITY_TWO_FACTOR_SETUP_WITHIN"],
    )


def test_opt_in_state_token(app, client, get_message):
    """
    Test using forms and new state_token approach (rather than sessions to store
    intermediate state)
    """
    authenticate(client, "jill@lp.com")

    # opt-in for SMS 2FA
    sms_sender = SmsSenderFactory.createSender("test")
    data = dict(setup="sms", phone="+442083661177")
    response = client.post("/tf-setup", data=data, follow_redirects=True)
    assert b"Enter code to complete setup" in response.data
    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]
    verify_url = get_form_action(response, 1)  # this will be with state_token

    # send in bad token
    response = client.post(
        "/tf-setup/not-a-token", data=dict(code=code), follow_redirects=True
    )
    assert check_location(app, response.history[0].location, "/tf-setup")
    assert get_message("API_ERROR") in response.data

    # send in bad code
    response = client.post(verify_url, data=dict(code=12345), follow_redirects=True)
    assert check_location(app, response.history[0].location, "/tf-setup")
    assert get_message("TWO_FACTOR_INVALID_TOKEN") in response.data

    # Validate token - this should complete 2FA setup
    response = client.post(verify_url, data=dict(code=code), follow_redirects=True)
    assert b"You successfully changed" in response.data
    assert check_location(app, response.history[0].location, "/tf-setup")

    # Upon completion, session cookie shouldn't have any two factor stuff in it.
    session = get_session(response)
    assert not tf_in_session(session)

    response = client.get("/tf-setup")
    assert b"Disable two-factor" in response.data
    assert b"Currently setup two-factor method: SMS" in response.data


def test_opt_out_json(app, client, get_message):
    headers = {"Accept": "application/json", "Content-Type": "application/json"}

    tf_authenticate(app, client)
    response = client.get("tf-setup", headers=headers)
    assert "disable" in response.json["response"]["tf_available_methods"]

    response = client.post("tf-setup", json=dict(setup="disable"), headers=headers)
    assert response.status_code == 200
    logout(client)

    # Should be able to log in with just user/pass
    response = authenticate(client, "gal@lp.com")
    session = get_session(response)
    assert "tf_state" not in session
    # verify logged in
    assert is_authenticated(client, get_message)
    response = client.get("tf-setup", headers=headers)
    assert "disable" not in response.json["response"]["tf_available_methods"]


@pytest.mark.filterwarnings("ignore")
@pytest.mark.recoverable()
@pytest.mark.settings(two_factor_required=True, auto_login_after_reset=True)
def test_recoverable(app, client, get_message):
    # make sure 'forgot password' doesn't bypass 2FA.
    # 'gal@lp.com' already setup for SMS

    rtokens = []
    sms_sender = SmsSenderFactory.createSender("test")

    @reset_password_instructions_sent.connect_via(app)
    def on_instructions_sent(sapp, **kwargs):
        rtokens.append(kwargs["token"])

    client.post("/reset", data=dict(email="gal@lp.com"), follow_redirects=True)
    response = client.post(
        "/reset/" + rtokens[0],
        data={"password": "awesome sunset", "password_confirm": "awesome sunset"},
        follow_redirects=True,
    )
    # Should have redirected us to the 2FA login page
    assert b"Please enter your authentication code" in response.data

    # we shouldn't be logged in
    response = client.get("/profile", follow_redirects=False)
    assert response.status_code == 302

    # Grab code that was sent
    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]
    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert get_message("TWO_FACTOR_LOGIN_SUCCESSFUL") in response.data

    # verify we are logged in
    response = client.get("/profile", follow_redirects=False)
    assert response.status_code == 200


@pytest.mark.settings(two_factor_required=True)
def test_admin_setup_reset(app, client, get_message):
    # Verify can use administrative datastore method to setup SMS
    # and that administrative reset removes access.
    sms_sender = SmsSenderFactory.createSender("test")

    data = dict(email="gene@lp.com", password="password")
    response = client.post(
        "/login", json=data, headers={"Content-Type": "application/json"}
    )
    assert response.json["response"]["tf_required"]

    # we shouldn't be logged in
    response = client.get("/profile", follow_redirects=False)
    assert response.status_code == 302
    assert response.location == "/login?next=/profile"

    # Use admin to setup gene's SMS/phone.
    with app.app_context():
        user = app.security.datastore.find_user(email="gene@lp.com")
        totp_secret = app.security._totp_factory.generate_totp_secret()
        app.security.datastore.tf_set(user, "sms", totp_secret, phone="+442083661177")
        app.security.datastore.commit()

    response = authenticate(client, "gene@lp.com")
    session = get_session(response)
    assert session["tf_state"] == "ready"

    # Grab code that was sent
    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]
    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert get_message("TWO_FACTOR_LOGIN_SUCCESSFUL") in response.data

    # verify we are logged in
    response = client.get("/profile", follow_redirects=False)
    assert response.status_code == 200

    # logout
    logout(client)

    # use administrative reset method
    with app.app_context():
        user = app.security.datastore.find_user(email="gene@lp.com")
        app.security.datastore.reset_user_access(user)
        app.security.datastore.commit()

    data = dict(email="gene@lp.com", password="password")
    response = client.post(
        "/login", json=data, headers={"Content-Type": "application/json"}
    )
    assert response.json["response"]["tf_required"]
    assert response.json["response"]["tf_state"] == "setup_from_login"

    # we shouldn't be logged in
    response = client.get("/profile", follow_redirects=False)
    assert response.status_code == 302


@pytest.mark.settings(two_factor_required=True)
def test_datastore(app, clients, get_message):
    # Test that user record is properly set after proper 2FA setup.
    client = clients
    sms_sender = SmsSenderFactory.createSender("test")
    data = dict(email="gene@lp.com", password="password")
    response = client.post(
        "/login", json=data, headers={"Content-Type": "application/json"}
    )
    assert response.json["meta"]["code"] == 200
    session = get_session(response)
    assert session["tf_state"] == "setup_from_login"

    # setup
    data = dict(setup="sms", phone="+442083661177")
    response = client.post(
        "/tf-setup", json=data, headers={"Content-Type": "application/json"}
    )

    assert sms_sender.get_count() == 1
    session = get_session(response)
    assert session["tf_state"] == "validating_profile"
    assert session["tf_primary_method"] == "sms"

    code = sms_sender.messages[0].split()[-1]

    # submit token and show appropriate response
    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert get_message("TWO_FACTOR_LOGIN_SUCCESSFUL") in response.data
    session = get_session(response)
    # Verify that successful login clears session info
    assert not tf_in_session(session)

    with app.app_context():
        user = app.security.datastore.find_user(email="gene@lp.com")
        assert user.tf_primary_method == "sms"
        assert user.tf_phone_number == "+442083661177"
        assert "enckey" in user.tf_totp_secret


def test_totp_secret_generation(app, client):
    """
    Test the totp secret generation upon changing method to make sure
    it stays the same after the process is completed
    """

    # Properly log in jill for this test
    signalled_identity = []

    @identity_changed.connect_via(app)
    def on_identity_changed(app, identity):
        signalled_identity.append(identity.id)

    response = authenticate(client, "jill@lp.com")
    session = get_session(response)
    assert "tf_state" not in session
    with app.app_context():
        user = app.security.datastore.find_user(email="jill@lp.com")
        assert signalled_identity[0] == user.fs_uniquifier
        assert not user.tf_totp_secret
    del signalled_identity[:]

    sms_sender = SmsSenderFactory.createSender("test")
    data = dict(setup="sms", phone="+442083661188")
    response = client.post("/tf-setup", data=data, follow_redirects=True)
    assert b"Enter code to complete setup" in response.data
    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]

    # Retrieve the currently generated totp secret for later comparison
    session = get_session(response)
    generated_secret = session["tf_totp_secret"]
    assert "enckey" in generated_secret

    # Validate token - this should complete 2FA setup
    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert b"You successfully changed" in response.data

    # Retrieve the final totp secret and make sure it matches the previous one
    with app.app_context():
        user = app.security.datastore.find_user(email="jill@lp.com")
        assert generated_secret == user.tf_totp_secret

    # Finally opt back out and check that tf_totp_secret is None
    data = dict(setup="disable")
    response = client.post("/tf-setup", data=data, follow_redirects=True)
    assert b"You successfully disabled two-factor authorization." in response.data
    with app.app_context():
        user = app.security.datastore.find_user(email="jill@lp.com")
        assert user.tf_totp_secret is None

    # Log out
    logout(client)
    assert not signalled_identity[0]
    del signalled_identity[:]


@pytest.mark.settings(two_factor_enabled_methods=["authenticator"])
def test_just_authenticator(app, client):
    authenticate(client, email="jill@lp.com")

    response = client.get("/tf-setup", follow_redirects=True)
    assert b"Set up using SMS" not in response.data

    data = dict(setup="authenticator")
    response = client.post("/tf-setup", data=data, follow_redirects=True)
    assert b"Submit Code" in response.data

    # test json
    response = client.post("/tf-setup", json=data)
    assert response.status_code == 200


@pytest.mark.settings(
    USER_IDENTITY_ATTRIBUTES=[
        {"username": {"mapper": lambda x: "@" not in x}},
        {"email": {"mapper": uia_email_mapper}},
    ]
)
def test_authr_identity(app, client):
    # Setup authenticator
    headers = {"Accept": "application/json", "Content-Type": "application/json"}
    authenticate(client, email="jill@lp.com")

    setup_data = dict(setup="authenticator")
    response = client.post("/tf-setup", json=setup_data, headers=headers)
    assert response.json["response"]["tf_authr_issuer"] == "tests"
    assert response.json["response"]["tf_authr_username"] == "jill"
    assert response.json["response"]["tf_state"] == "validating_profile"
    assert "tf_authr_key" in response.json["response"]


@pytest.mark.settings(
    USER_IDENTITY_ATTRIBUTES=[
        {"security_number": {"mapper": lambda x: x.isdigit()}},
        {"email": {"mapper": uia_email_mapper}},
    ]
)
def test_authr_identity_num(app, client):
    # Test that response to setup has 'security_number' as the 'username'
    # since it is listed first.
    headers = {"Accept": "application/json", "Content-Type": "application/json"}
    authenticate(client, email="jill@lp.com")

    setup_data = dict(setup="authenticator")
    response = client.post("/tf-setup", json=setup_data, headers=headers)
    assert response.json["response"]["tf_authr_username"] == "456789"
    assert "tf_authr_key" in response.json["response"]


@pytest.mark.settings(
    USER_IDENTITY_ATTRIBUTES=[
        {"email": {"mapper": uia_email_mapper}},
        {"username": {"mapper": lambda x: x}},
    ]
)
def test_email_salutation(app, client):
    authenticate(client, email="jill@lp.com")
    response = client.post("/tf-setup", data=dict(setup="email"), follow_redirects=True)
    msg = b"Enter code to complete setup"
    assert msg in response.data
    outbox = app.mail.outbox

    assert "jill@lp.com" in outbox[0].to
    assert "jill@lp.com" in outbox[0].body
    assert "jill@lp.com" in outbox[0].alternatives[0][0]


@pytest.mark.settings(
    USER_IDENTITY_ATTRIBUTES=[
        {"username": {"mapper": lambda x: "@" not in x}},
        {"email": {"mapper": uia_email_mapper}},
    ]
)
def test_username_salutation(app, client):
    authenticate(client, email="jill@lp.com")
    response = client.post("/tf-setup", data=dict(setup="email"), follow_redirects=True)
    msg = b"Enter code to complete setup"
    assert msg in response.data
    outbox = app.mail.outbox

    assert "jill@lp.com" in outbox[0].to
    assert "jill@lp.com" not in outbox[0].body
    assert "jill@lp.com" not in outbox[0].alternatives[0][0]
    assert "jill" in outbox[0].body


@pytest.mark.settings(sms_service="bad")
def test_bad_sender(app, client, get_message):
    # If SMS sender fails - make sure propagated
    # Test form, json, x signin, setup
    headers = {"Accept": "application/json", "Content-Type": "application/json"}

    # test normal, already setup up login.
    with capture_flashes() as flashes:
        data = {"email": "gal@lp.com", "password": "password"}
        response = client.post("login", data=data, follow_redirects=False)
        assert response.status_code == 302
        assert "/login" in response.location
    assert get_message("FAILED_TO_SEND_CODE") in flashes[0]["message"].encode("utf-8")

    # test w/ JSON
    data = dict(email="gal@lp.com", password="password")
    response = client.post("login", json=data, headers=headers)
    assert response.status_code == 500
    assert response.json["response"]["errors"][0].encode("utf-8") == get_message(
        "FAILED_TO_SEND_CODE"
    )

    # Now test setup
    tf_authenticate(app, client)
    data = dict(setup="sms", phone="+442083661188")
    response = client.post("tf-setup", data=data)
    assert get_message("FAILED_TO_SEND_CODE") in response.data

    response = client.post("tf-setup", json=data, headers=headers)
    assert response.status_code == 500
    assert response.json["response"]["errors"][0].encode("utf-8") == get_message(
        "FAILED_TO_SEND_CODE"
    )


def test_replace_send_code(app, get_message):
    pytest.importorskip("sqlalchemy")
    pytest.importorskip("flask_sqlalchemy")

    # replace tf_send_code - and have it return an error to check that.
    from flask_sqlalchemy import SQLAlchemy
    from flask_security.models import fsqla_v2 as fsqla
    from flask_security import Security, hash_password

    app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:"
    db = SQLAlchemy(app)

    fsqla.FsModels.set_db_info(db)

    class Role(db.Model, fsqla.FsRoleMixin):
        pass

    class User(db.Model, fsqla.FsUserMixin):
        rv = [None, "That didnt work out as we planned", "Failed Again"]

        def tf_send_security_token(self, method, **kwargs):
            return User.rv.pop(0)

    with app.app_context():
        db.create_all()

    ds = SQLAlchemyUserDatastore(db, User, Role)
    app.security = Security(app, datastore=ds)

    with app.app_context():
        client = app.test_client()

        ds.create_user(
            email="trp@lp.com",
            password=hash_password("password"),
            tf_primary_method="sms",
            tf_totp_secret=app.security._totp_factory.generate_totp_secret(),
        )
        ds.commit()

    data = dict(email="trp@lp.com", password="password")
    response = client.post("/login", data=data, follow_redirects=True)
    assert b"Please enter your authentication code" in response.data
    rescue_data = dict(help_setup="email")
    response = client.post("/tf-rescue", data=rescue_data, follow_redirects=True)
    assert b"That didnt work out as we planned" in response.data

    # Test JSON
    headers = {"Accept": "application/json", "Content-Type": "application/json"}
    response = client.post("/tf-rescue", json=rescue_data, headers=headers)
    assert response.status_code == 500
    assert response.json["response"]["field_errors"]["help_setup"][0] == "Failed Again"
    with app.app_context():
        db.engine.dispose()


def test_propagate_next(app, client):
    # verify we propagate the ?next param all the way through a two-factor login
    with capture_send_code_requests() as codes:
        data = dict(email="gal@lp.com", password="password")
        response = client.post("/login?next=/im-in", data=data, follow_redirects=True)
        assert "?next=/im-in" in response.request.url
        # grab URL from form to show that our template propagates ?next
        verify_url = get_form_action(response)
        response = client.post(
            verify_url, data=dict(code=codes[0]["login_token"]), follow_redirects=False
        )
        assert "/im-in" in response.location
        logout(client)

        # do it with next in the form
        data = dict(email="gal@lp.com", password="password", next="/im-in")
        response = client.post("/login", data=data, follow_redirects=True)
        assert "?next=/im-in" in response.request.url
        # grab URL from form to show that our template propagates ?next
        verify_url = get_form_action(response)
        response = client.post(
            verify_url, data=dict(code=codes[1]["login_token"]), follow_redirects=False
        )
        assert "/im-in" in response.location


@pytest.mark.settings(freshness=timedelta(minutes=0))
def test_verify(app, client, get_message):
    # Test setup when reauthenticate required
    authenticate(client)
    response = client.get("tf-setup", follow_redirects=False)
    assert check_location(app, response.location, "/verify?next=/tf-setup")
    logout(client)

    # Now try again - follow redirects to get to verify form
    # This call should require re-verify
    authenticate(client)
    response = client.get("tf-setup", follow_redirects=True)
    assert get_message("REAUTHENTICATION_REQUIRED") in response.data
    verify_password_url = get_form_action(response)

    # Send wrong password
    response = client.post(
        verify_password_url,
        data=dict(password="iforgot"),
        follow_redirects=True,
    )
    assert response.status_code == 200
    assert get_message("INVALID_PASSWORD") in response.data

    # Verify with correct password
    with capture_flashes() as flashes:
        response = client.post(
            verify_password_url,
            data=dict(password="password"),
            follow_redirects=False,
        )
        assert response.status_code == 302
        assert check_location(app, response.location, "/tf-setup")

    assert get_message("REAUTHENTICATION_SUCCESSFUL") == flashes[0]["message"].encode(
        "utf-8"
    )


def test_verify_json(app, client, get_message):
    # Test setup when reauthenticate required
    # N.B. with freshness=0 we never set a grace period and should never be able to
    # get to /tf-setup
    authenticate(client)
    headers = {"Accept": "application/json", "Content-Type": "application/json"}

    app.config["SECURITY_FRESHNESS"] = timedelta(minutes=0)
    response = client.get("tf-setup", headers=headers)
    assert response.status_code == 401
    assert response.json["response"]["reauth_required"]

    response = client.post("verify", json=dict(password="notmine"), headers=headers)
    assert response.status_code == 400
    assert response.json["response"]["errors"][0].encode("utf-8") == get_message(
        "INVALID_PASSWORD"
    )

    response = client.post("verify", json=dict(password="password"), headers=headers)
    assert response.status_code == 200

    app.config["SECURITY_FRESHNESS"] = timedelta(minutes=60)
    response = client.get("tf-setup", headers=headers)
    assert response.status_code == 200


@pytest.mark.settings(freshness=timedelta(minutes=-1))
def test_setup_nofresh(app, client, get_message):
    authenticate(client)
    response = client.get("tf-setup", follow_redirects=False)
    assert response.status_code == 200


@pytest.mark.settings(two_factor_enabled_methods=["email"])
def test_no_sms(app, get_message):
    pytest.importorskip("sqlalchemy")
    pytest.importorskip("flask_sqlalchemy")

    # Make sure that don't require tf_phone_number if SMS isn't an option.
    from sqlalchemy import (
        Boolean,
        Column,
        Integer,
        String,
    )
    from sqlalchemy.orm import relationship, backref
    from flask_sqlalchemy import SQLAlchemy
    from flask_security.models import fsqla_v2 as fsqla
    from flask_security import Security, UserMixin, hash_password

    app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:"
    db = SQLAlchemy(app)

    fsqla.FsModels.set_db_info(db)

    class Role(db.Model, fsqla.FsRoleMixin):
        pass

    class User(db.Model, UserMixin):
        id = Column(Integer, primary_key=True)
        email = Column(String(255), unique=True, nullable=False)
        password = Column(String(255), nullable=False)
        active = Column(Boolean(), nullable=False)

        # Faster token checking
        fs_uniquifier = Column(String(64), unique=True, nullable=False)

        # 2FA
        tf_primary_method = Column(String(64), nullable=True)
        tf_totp_secret = Column(String(255), nullable=True)

        roles = relationship(
            "Role", secondary="roles_users", backref=backref("users", lazy="dynamic")
        )

    with app.app_context():
        db.create_all()

    ds = SQLAlchemyUserDatastore(db, User, Role)
    app.security = Security(app, datastore=ds)

    with app.app_context():
        ds.create_user(
            email="trp@lp.com",
            password=hash_password("password"),
        )
        ds.commit()

    client = app.test_client()
    data = dict(email="trp@lp.com", password="password")
    client.post("/login", data=data, follow_redirects=True)

    response = client.post("/tf-setup", data=dict(setup="email"), follow_redirects=True)
    msg = b"Enter code to complete setup"
    assert msg in response.data

    code = app.mail.outbox[0].body.split()[-1]
    # submit right token and show appropriate response
    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    assert b"You successfully changed your two-factor method" in response.data

    with app.app_context():
        db.engine.dispose()


@pytest.mark.settings(two_factor_post_setup_view="/post_setup_view")
def test_post_setup_redirect(app, client):
    authenticate(client, "jill@lp.com")

    sms_sender = SmsSenderFactory.createSender("test")
    data = dict(setup="sms", phone="+442083661177")
    response = client.post("/tf-setup", data=data, follow_redirects=True)
    assert b"Enter code to complete setup" in response.data
    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]

    # Validate token - this should complete 2FA setup
    response = client.post("/tf-validate", data=dict(code=code), follow_redirects=False)
    assert check_location(app, response.location, "/post_setup_view")


@pytest.mark.app_settings(babel_default_locale="fr_FR")
@pytest.mark.babel()
def test_xlation(app, client, get_message_local):
    # Test method translation
    assert check_xlation(app, "fr_FR"), "You must run python setup.py compile_catalog"

    # login as gal2 which has 'authenticator' set up
    response = authenticate(client, email="gal2@lp.com", follow_redirects=True)
    with app.test_request_context():
        existing = (
            "Veuillez saisir votre code d'authentification généré via: authentificateur"
        )
        assert markupsafe.escape(existing).encode() in response.data
    with app.app_context():
        # generate 'code' as authenticator would and complete authentication
        user = app.security.datastore.find_user(email="gal2@lp.com")
        code = app.security._totp_factory.generate_totp_password(user.tf_totp_secret)
    client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
    response = client.get("/tf-setup", follow_redirects=True)
    with app.test_request_context():
        existing = "Méthode à deux facteurs actuellement configurée : authentificateur"
        assert markupsafe.escape(existing).encode() in response.data


@pytest.mark.csrf(ignore_unauth=True)
@pytest.mark.settings(two_factor_post_setup_view="/post_setup_view")
def test_setup_csrf(app, client):
    # Verify /tf-setup properly handles CSRF and template relays CSRF errors
    tf_authenticate(app, client)
    response = client.get("tf-setup")
    assert b"Disable" in response.data
    csrf_token = get_form_input_value(response, "csrf_token")

    response = client.post("tf-setup", data=dict(setup="disable"))
    assert b"The CSRF token is missing" in response.data

    response = client.post(
        "tf-setup", data=dict(setup="disable", csrf_token=csrf_token)
    )
    assert check_location(app, response.location, "/post_setup_view")


@pytest.mark.csrf(ignore_unauth=True, csrfprotect=True)
def test_setup_csrf_header(app, client):
    # Test that can setup using csrf token in header
    tf_authenticate(app, client)
    response = client.get("tf-setup", json=dict())
    csrf_token = response.json["response"]["csrf_token"]

    response = client.post("tf-setup", json=dict(setup="disable"))
    assert response.status_code == 400
    assert response.json["response"]["errors"][0] == "The CSRF token is missing."

    response = client.post(
        "tf-setup", json=dict(setup="disable"), headers={"X-CSRF-Token": csrf_token}
    )
    assert response.status_code == 200


@pytest.mark.csrf(csrfprotect=True)
@pytest.mark.settings(CSRF_COOKIE_NAME="XSRF-Token")
def test_csrf_2fa_login_cookie(app, client):
    # Use XSRF-Token cookie for entire login sequence
    sms_sender = SmsSenderFactory.createSender("test")
    response = client.get(
        "/login", data={}, headers={"Content-Type": "application/json"}
    )
    assert client.get_cookie("XSRF-Token")
    csrf_token = response.json["response"]["csrf_token"]
    assert csrf_token == client.get_cookie("XSRF-Token").value

    response = client.post(
        "/login",
        json=dict(email="gal@lp.com", password="password"),
        headers={
            "Content-Type": "application/json",
            "X-CSRF-Token": client.get_cookie("XSRF-Token").value,
        },
    )
    assert b'"code": 200' in response.data
    session = get_session(response)
    assert session["tf_state"] == "ready"

    assert sms_sender.get_count() == 1
    code = sms_sender.messages[0].split()[-1]

    response = client.post(
        "/tf-validate",
        json=dict(code=code),
        headers={
            "Content-Type": "application/json",
            "X-CSRF-Token": client.get_cookie("XSRF-Token").value,
        },
    )
    assert response.status_code == 200

    # verify original session csrf_token still works.
    response = client.post(
        "/json_auth",
        json=dict(label="label"),
        headers={"Content-Type": "application/json", "X-CSRF-Token": csrf_token},
    )
    assert response.status_code == 200

    # use XSRF_Cookie to send in csrf_token
    response = client.post(
        "/json_auth",
        json=dict(label="label"),
        headers={
            "Content-Type": "application/json",
            "X-CSRF-Token": client.get_cookie("XSRF-Token").value,
        },
    )
    assert response.status_code == 200
    assert response.json["label"] == "label"


@pytest.mark.csrf(ignore_unauth=True, csrfprotect=True)
@pytest.mark.settings(CSRF_COOKIE_NAME="XSRF-Token")
def test_csrf_2fa_nounauth_cookie(app, client):
    # use CSRF cookie when ignoring unauth endpoints
    sms_sender = SmsSenderFactory.createSender("test")
    response = client.post(
        "/login",
        json=dict(email="gal@lp.com", password="password"),
        headers={"Content-Type": "application/json"},
    )

    code = sms_sender.messages[0].split()[-1]
    response = client.post(
        "/tf-validate",
        json=dict(code=code),
        headers={"Content-Type": "application/json"},
    )
    assert response.status_code == 200

    response = client.post(
        "/json_auth",
        json=dict(label="label"),
        headers={
            "Content-Type": "application/json",
            "X-CSRF-Token": client.get_cookie("XSRF-Token").value,
        },
    )
    assert response.status_code == 200
    assert response.json["label"] == "label"
