1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
|
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
import pytest
from passlib.context import CryptContext
from pwdlib.hashers.argon2 import Argon2Hasher as PwdlibArgon2Hasher
from sqlalchemy import Engine, String
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker
from sqlalchemy.orm import Mapped, Session, mapped_column, sessionmaker
from advanced_alchemy.base import BigIntBase
from advanced_alchemy.types import EncryptedString, PasswordHash
from advanced_alchemy.types.encrypted_string import FernetBackend, PGCryptoBackend
from advanced_alchemy.types.password_hash.argon2 import Argon2Hasher
from advanced_alchemy.types.password_hash.base import HashedPassword
from advanced_alchemy.types.password_hash.passlib import PasslibHasher
from advanced_alchemy.types.password_hash.pwdlib import PwdlibHasher
from tests.integration.test_models import DatabaseCapabilities
if TYPE_CHECKING:
from pytest import MonkeyPatch
pytestmark = [
pytest.mark.integration,
pytest.mark.xdist_group("password_hash"),
]
# Define the User model using PasswordHash
class User(BigIntBase):
__tablename__ = "test_user_password_hash"
name: Mapped[str] = mapped_column(String(50))
passlib_password: Mapped[Optional[str]] = mapped_column(
PasswordHash(backend=PasslibHasher(context=CryptContext(schemes=["argon2"])))
)
argon2_password: Mapped[Optional[str]] = mapped_column(PasswordHash(backend=Argon2Hasher()))
pwdlib_password: Mapped[Optional[str]] = mapped_column(
PasswordHash(backend=PwdlibHasher(hasher=PwdlibArgon2Hasher()))
)
__table_args__ = {"info": {"allow_eager": True}}
@pytest.fixture()
def password_test_tables(engine: Engine) -> None:
"""Create password test tables for sync engines."""
if getattr(engine.dialect, "name", "") != "mock" and not getattr(engine.dialect, "name", "").startswith("spanner"):
User.metadata.create_all(engine)
@pytest.fixture()
async def password_test_tables_async(async_engine: AsyncEngine) -> None:
"""Create password test tables for async engines."""
if getattr(async_engine.dialect, "name", "") != "mock" and not getattr(async_engine.dialect, "name", "").startswith(
"spanner"
):
async with async_engine.begin() as conn:
await conn.run_sync(User.metadata.create_all)
def test_password_hash_sync(engine: Engine, password_test_tables: None, monkeypatch: MonkeyPatch) -> None:
"""Test password hashing with Argon2 and Passlib backends using sync engines."""
# Skip for unsupported backends
if DatabaseCapabilities.should_skip_bigint(engine.dialect.name):
pytest.skip(f"{engine.dialect.name} doesn't support bigint PKs well")
# Skip mock engine - it doesn't support auto-generated primary keys
if engine.dialect.name == "mock":
pytest.skip("Mock engine doesn't support auto-generated primary keys")
# Skip Spanner - doesn't support direct UNIQUE constraints
if engine.dialect.name.startswith("spanner"):
pytest.skip("Spanner doesn't support direct UNIQUE constraints")
# Skip CockroachDB - it doesn't support BigInt primary keys
if engine.dialect.name.startswith("cockroach"):
pytest.skip("CockroachDB doesn't support BigInt primary keys")
session_factory: sessionmaker[Session] = sessionmaker(engine, expire_on_commit=False)
# Test with session
with session_factory() as db_session:
# Create user with passlib password
user1 = User(name="user1", passlib_password="password123")
db_session.add(user1)
db_session.flush()
db_session.refresh(user1)
# Verify password hash is created correctly
assert isinstance(user1.passlib_password, HashedPassword)
assert user1.passlib_password.hash_string.startswith("$argon2") # type: ignore[unreachable]
assert user1.passlib_password.verify("password123")
assert not user1.passlib_password.verify("wrong_password")
# Test non-string password inputs
assert not user1.passlib_password.verify(123) # type: ignore[arg-type]
assert not user1.passlib_password.verify(123.45) # type: ignore[arg-type]
assert not user1.passlib_password.verify(True) # type: ignore[arg-type]
assert not user1.passlib_password.verify(None) # type: ignore[arg-type]
assert not user1.passlib_password.verify(["password123"]) # type: ignore[arg-type]
assert not user1.passlib_password.verify({"password": "password123"}) # type: ignore[arg-type]
# Create user with argon2 password
user2 = User(name="user2", argon2_password="secret123")
db_session.add(user2)
db_session.flush()
db_session.refresh(user2)
# Verify password hash is created correctly
assert isinstance(user2.argon2_password, HashedPassword)
assert user2.argon2_password.hash_string.startswith("$argon2")
assert user2.argon2_password.verify("secret123")
assert not user2.argon2_password.verify("wrong_secret")
# Test non-string password inputs with argon2
assert not user2.argon2_password.verify(123) # type: ignore[arg-type]
assert not user2.argon2_password.verify(123.45) # type: ignore[arg-type]
assert not user2.argon2_password.verify(True) # type: ignore[arg-type]
assert not user2.argon2_password.verify(None) # type: ignore[arg-type]
assert not user2.argon2_password.verify(["secret123"]) # type: ignore[arg-type]
assert not user2.argon2_password.verify({"password": "secret123"}) # type: ignore[arg-type]
# Test updating password
user2.argon2_password = "newsecret123"
db_session.flush()
db_session.refresh(user2)
assert isinstance(user2.argon2_password, HashedPassword)
assert user2.argon2_password.verify("newsecret123")
assert not user2.argon2_password.verify("secret123")
# Test setting password to None
user2.argon2_password = None
db_session.flush()
db_session.refresh(user2)
assert user2.argon2_password is None
async def test_password_hash_async(
async_engine: AsyncEngine, password_test_tables_async: None, monkeypatch: MonkeyPatch
) -> None:
"""Test password hashing with Argon2 and Passlib backends using async engines."""
# Skip for unsupported backends
if DatabaseCapabilities.should_skip_bigint(async_engine.dialect.name):
pytest.skip(f"{async_engine.dialect.name} doesn't support bigint PKs well")
# Skip mock engine - it doesn't support auto-generated primary keys
if async_engine.dialect.name == "mock":
pytest.skip("Mock engine doesn't support auto-generated primary keys")
# Skip Spanner - doesn't support direct UNIQUE constraints
if async_engine.dialect.name.startswith("spanner"):
pytest.skip("Spanner doesn't support direct UNIQUE constraints")
# Skip CockroachDB - it doesn't support BigInt primary keys
if async_engine.dialect.name.startswith("cockroach"):
pytest.skip("CockroachDB doesn't support BigInt primary keys")
session_factory = async_sessionmaker(async_engine, expire_on_commit=False)
# Test with async session
async with session_factory() as db_session:
# Create user with passlib password
user1 = User(name="user1_async", passlib_password="password123")
db_session.add(user1)
await db_session.flush()
await db_session.refresh(user1)
# Verify password hash is created correctly
assert isinstance(user1.passlib_password, HashedPassword)
assert user1.passlib_password.hash_string.startswith("$argon2") # type: ignore[unreachable]
assert user1.passlib_password.verify("password123")
assert not user1.passlib_password.verify("wrong_password")
# Create user with argon2 password
user2 = User(name="user2_async", argon2_password="secret123")
db_session.add(user2)
await db_session.flush()
await db_session.refresh(user2)
# Verify password hash is created correctly
assert isinstance(user2.argon2_password, HashedPassword)
assert user2.argon2_password.hash_string.startswith("$argon2")
assert user2.argon2_password.verify("secret123")
assert not user2.argon2_password.verify("wrong_secret")
# Test updating password
user2.argon2_password = "newsecret123"
await db_session.flush()
await db_session.refresh(user2)
assert isinstance(user2.argon2_password, HashedPassword)
assert user2.argon2_password.verify("newsecret123")
assert not user2.argon2_password.verify("secret123")
# Test setting password to None
user2.argon2_password = None
await db_session.flush()
await db_session.refresh(user2)
assert user2.argon2_password is None
def test_password_hash_repr() -> None:
"""Test __repr__() method for PasswordHash with different backends."""
# Test Argon2Hasher backend
argon2_hash = PasswordHash(backend=Argon2Hasher(), length=128)
assert repr(argon2_hash) == "PasswordHash(backend=sa.Argon2Hasher(), length=128)"
# Test PasslibHasher backend
passlib_hash = PasswordHash(backend=PasslibHasher(context=CryptContext(schemes=["argon2"])), length=256)
assert repr(passlib_hash) == "PasswordHash(backend=sa.PasslibHasher(), length=256)"
# Test PwdlibHasher backend
pwdlib_hash = PasswordHash(backend=PwdlibHasher(hasher=PwdlibArgon2Hasher()), length=512)
assert repr(pwdlib_hash) == "PasswordHash(backend=sa.PwdlibHasher(), length=512)"
def test_encrypted_string_repr() -> None:
"""Test __repr__() method for EncryptedString with different backends."""
# Test FernetBackend (default)
enc_str_fernet = EncryptedString(key="test_key", backend=FernetBackend, length=100)
assert repr(enc_str_fernet) == "EncryptedString(key='test_key', backend=FernetBackend, length=100)"
# Test PGCryptoBackend
enc_str_pgcrypto = EncryptedString(key=b"test_bytes_key", backend=PGCryptoBackend, length=200)
assert repr(enc_str_pgcrypto) == "EncryptedString(key=b'test_bytes_key', backend=PGCryptoBackend, length=200)"
# Test with callable key
def get_key() -> str:
return "dynamic_key"
# The repr should include the callable object itself
enc_str_callable = EncryptedString(key=get_key, backend=FernetBackend)
assert repr(enc_str_callable) == "EncryptedString(key=get_key, backend=FernetBackend, length=None)"
|