1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
import json
from contextlib import contextmanager
import flask
import pytest
from flask_session.sqlalchemy import SqlAlchemySession
from sqlalchemy import text
class TestSQLAlchemy:
"""This requires package: sqlalchemy"""
@contextmanager
def setup_sqlalchemy(self, app):
try:
app.session_interface.client.session.execute(text("DELETE FROM sessions"))
app.session_interface.client.session.commit()
yield
finally:
app.session_interface.client.session.execute(text("DELETE FROM sessions"))
app.session_interface.client.session.close()
def retrieve_stored_session(self, key, app):
session_model = (
app.session_interface.client.session.query(
app.session_interface.sql_session_model
)
.filter_by(session_id=key)
.first()
)
if session_model:
return session_model.data
return None
@pytest.mark.filterwarnings("ignore:No valid SQLAlchemy instance provided")
def test_use_signer(self, app_utils):
app = app_utils.create_app(
{
"SESSION_TYPE": "sqlalchemy",
"SQLALCHEMY_DATABASE_URI": "sqlite:///",
}
)
with app.app_context() and self.setup_sqlalchemy(
app
) and app.test_request_context():
assert isinstance(
flask.session,
SqlAlchemySession,
)
app_utils.test_session(app)
# Check if the session is stored in SQLAlchemy
cookie = app_utils.test_session_with_cookie(app)
session_id = cookie.split(";")[0].split("=")[1]
byte_string = self.retrieve_stored_session(f"session:{session_id}", app)
stored_session = (
json.loads(byte_string.decode("utf-8")) if byte_string else {}
)
assert stored_session.get("value") == "44"
|