1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
|
"""
test_datastore
~~~~~~~~~~~~~~
Datastore tests
:copyright: (c) 2012 by Matt Wright.
:copyright: (c) 2019 by J. Christopher Wagner (jwag).
:license: MIT, see LICENSE for more details.
"""
import datetime
from pytest import raises, skip
from tests.test_utils import init_app_with_options, get_num_queries, is_sqlalchemy
from flask_security import RoleMixin, Security, UserMixin
from flask_security.datastore import Datastore, UserDatastore
class User(UserMixin):
pass
class Role(RoleMixin):
pass
class MockDatastore(UserDatastore):
def put(self, model):
pass
def delete(self, model):
pass
def test_unimplemented_datastore_methods():
datastore = Datastore(None)
assert datastore.db is None
with raises(NotImplementedError):
datastore.put(None)
with raises(NotImplementedError):
datastore.delete(None)
assert not datastore.commit()
def test_unimplemented_user_datastore_methods():
datastore = UserDatastore(None, None)
with raises(NotImplementedError):
datastore.find_user(None)
with raises(NotImplementedError):
datastore.find_role(None)
def test_toggle_active():
datastore = MockDatastore(None, None)
user = User()
user.active = True
assert datastore.toggle_active(user) is True
assert not user.active
assert datastore.toggle_active(user) is True
assert user.active is True
def test_deactivate_user():
datastore = MockDatastore(None, None)
user = User()
user.active = True
assert datastore.deactivate_user(user) is True
assert not user.active
def test_activate_user():
datastore = MockDatastore(None, None)
user = User()
user.active = False
assert datastore.activate_user(user) is True
assert user.active is True
def test_deactivate_returns_false_if_already_false():
datastore = UserDatastore(None, None)
user = User()
user.active = False
assert not datastore.deactivate_user(user)
def test_activate_returns_false_if_already_true():
datastore = UserDatastore(None, None)
user = User()
user.active = True
assert not datastore.activate_user(user)
def test_find_user(app, datastore):
init_app_with_options(app, datastore)
with app.app_context():
user_id = datastore.find_user(email="gene@lp.com").fs_uniquifier
current_nqueries = get_num_queries(datastore)
assert user_id == datastore.find_user(security_number=889900).fs_uniquifier
end_nqueries = get_num_queries(datastore)
if current_nqueries is not None:
if is_sqlalchemy(datastore):
# This should have done just 1 query across all attrs.
assert end_nqueries == (current_nqueries + 1)
assert user_id == datastore.find_user(username="gene").fs_uniquifier
def test_find_user_multikey(app, datastore):
init_app_with_options(app, datastore)
with app.app_context():
with raises(ValueError):
datastore.find_user(
case_insensitive=True, email="gene@lp.com", security_number=889900
)
def test_find_role(app, datastore):
init_app_with_options(app, datastore)
with app.app_context():
role = datastore.find_role("admin")
assert role is not None
role = datastore.find_role("bogus")
assert role is None
def test_add_role_to_user(app, datastore):
init_app_with_options(app, datastore)
with app.app_context():
# Test with user object
user = datastore.find_user(email="matt@lp.com")
assert user.has_role("editor") is False
assert datastore.add_role_to_user(user, "editor") is True
assert datastore.add_role_to_user(user, "editor") is False
assert user.has_role("editor") is True
# Test remove role
assert datastore.remove_role_from_user(user, "editor") is True
assert datastore.remove_role_from_user(user, "editor") is False
def test_create_user_with_roles(app, datastore):
init_app_with_options(app, datastore)
with app.app_context():
role = datastore.find_role("admin")
datastore.commit()
user = datastore.create_user(
email="dude@lp.com", username="dude", password="password", roles=[role]
)
datastore.commit()
current_nqueries = get_num_queries(datastore)
user = datastore.find_user(email="dude@lp.com")
assert user.has_role("admin") is True
end_nqueries = get_num_queries(datastore)
# Verify that getting user and role is just one DB query
assert current_nqueries is None or end_nqueries == (current_nqueries + 1)
def test_delete_user(app, datastore):
init_app_with_options(app, datastore)
with app.app_context():
user = datastore.find_user(email="matt@lp.com")
datastore.delete_user(user)
datastore.commit()
user = datastore.find_user(email="matt@lp.com")
assert user is None
def test_access_datastore_from_factory(app, datastore):
security = Security()
security.init_app(app, datastore)
assert security.datastore is not None
assert security.app is not None
def test_access_datastore_from_app_factory_pattern(app, datastore):
security = Security(datastore=datastore)
security.init_app(app)
assert security.datastore is not None
assert security.app is not None
def test_init_app_kwargs_override_constructor_kwargs(app, datastore):
security = Security(
datastore=datastore,
login_form="__init__login_form",
register_form="__init__register_form",
)
security.init_app(app, login_form="init_app_login_form")
assert security.login_form == "init_app_login_form"
assert security.register_form == "__init__register_form"
def test_create_user_with_roles_and_permissions(app, datastore):
ds = datastore
if not hasattr(ds.role_model, "permissions"):
return
init_app_with_options(app, datastore)
with app.app_context():
role = ds.create_role(name="test1", permissions={"read"})
ds.commit()
user = ds.create_user(
email="dude@lp.com", username="dude", password="password", roles=[role]
)
datastore.commit()
user = datastore.find_user(email="dude@lp.com")
assert user.has_role("test1") is True
assert user.has_permission("read") is True
assert user.has_permission("write") is False
def test_permissions_strings(app, datastore):
# Make sure spaces are squashed.
ds = datastore
if not hasattr(ds.role_model, "permissions"):
return
init_app_with_options(app, ds)
with app.app_context():
perms = "read, write "
ds.create_role(name="test1", permissions=perms)
ds.commit()
t1 = ds.find_role("test1")
assert {"read", "write"} == t1.get_permissions()
def test_permissions_iter(app, datastore):
# Test permissions as an interable
ds = datastore
if not hasattr(ds.role_model, "permissions"):
return
init_app_with_options(app, ds)
with app.app_context():
perms = ["read", "write"]
ds.create_role(name="test1", permissions=perms)
ds.commit()
t1 = ds.find_role("test1")
assert {"read", "write"} == t1.get_permissions()
def test_modify_permissions(app, datastore):
ds = datastore
if not hasattr(ds.role_model, "permissions"):
return
init_app_with_options(app, ds)
with app.app_context():
perms = {"read", "write"}
ds.create_role(name="test1", permissions=perms)
ds.commit()
t1 = ds.find_role("test1")
assert perms == t1.get_permissions()
if hasattr(t1, "update_datetime"):
orig_update_time = t1.update_datetime
assert t1.update_datetime <= datetime.datetime.utcnow()
ds.add_permissions_to_role(t1, "execute")
ds.commit()
t1 = ds.find_role("test1")
assert perms.union({"execute"}) == t1.get_permissions()
ds.remove_permissions_from_role(t1, "read")
ds.commit()
t1 = ds.find_role("test1")
assert {"write", "execute"} == t1.get_permissions()
if hasattr(t1, "update_datetime"):
assert t1.update_datetime > orig_update_time
def test_get_permissions(app, datastore):
""" Verify that role.permissions = None works. """
ds = datastore
if not hasattr(ds.role_model, "permissions"):
return
init_app_with_options(app, ds)
with app.app_context():
t1 = ds.find_role("simple")
assert set() == t1.get_permissions()
def test_modify_permissions_multi(app, datastore):
ds = datastore
if not hasattr(ds.role_model, "permissions"):
return
# N.B. right now only sqlalchemy has the extended RoleModel.
init_app_with_options(app, ds)
with app.app_context():
perms = ["read", "write"]
ds.create_role(name="test1", permissions=perms)
ds.commit()
t1 = ds.find_role("test1")
assert {"read", "write"} == t1.get_permissions()
# send in a list
ds.add_permissions_to_role(t1, ["execute", "whatever"])
ds.commit()
t1 = ds.find_role("test1")
assert {"read", "write", "execute", "whatever"} == t1.get_permissions()
ds.remove_permissions_from_role(t1, ["read", "whatever"])
ds.commit()
assert {"write", "execute"} == t1.get_permissions()
# send in a set
perms = {"read", "write"}
ds.create_role(name="test2", permissions=",".join(perms))
ds.commit()
t2 = ds.find_role("test2")
ds.add_permissions_to_role(t2, {"execute", "whatever"})
ds.commit()
t2 = ds.find_role("test2")
assert {"read", "write", "execute", "whatever"} == t2.get_permissions()
ds.remove_permissions_from_role(t2, {"read", "whatever"})
ds.commit()
assert {"write", "execute"} == t2.get_permissions()
def test_modify_permissions_unsupported(app, datastore):
from tests.conftest import PonyUserDatastore
ds = datastore
if hasattr(datastore.role_model, "permissions"):
# already tested this
return
if isinstance(datastore, PonyUserDatastore):
# sigh - Pony doesn't use RoleMixin.
return
init_app_with_options(app, ds)
with app.app_context():
ds.create_role(name="test3")
ds.commit()
t3 = ds.find_role("test3")
with raises(NotImplementedError):
t3.add_permissions("whatever")
with raises(NotImplementedError):
t3.remove_permissions("whatever")
def test_uuid(app, request, tmpdir, realdburl):
""" Test that UUID extension of postgresql works as a primary id for users """
import uuid
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Boolean, Column, DateTime, Integer, ForeignKey, String
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship, backref
from flask_security import SQLAlchemyUserDatastore
from tests.conftest import _setup_realdb, _teardown_realdb
# UUID type only supported by postgres - not sqlite.
if not realdburl or "postgres" not in realdburl:
skip("This test only works on postgres")
db_url, db_info = _setup_realdb(realdburl)
app.config["SQLALCHEMY_DATABASE_URI"] = db_url
db = SQLAlchemy(app)
class RolesUsers(db.Model):
__tablename__ = "roles_users"
id = Column(Integer(), primary_key=True)
user_id = Column("user_id", UUID(as_uuid=True), ForeignKey("user.id"))
role_id = Column("role_id", UUID(as_uuid=True), ForeignKey("role.id"))
class User(db.Model, UserMixin):
__tablename__ = "user"
id = Column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True
)
email = Column(String(255), unique=True)
fs_uniquifier = Column(String(64), unique=True, nullable=False)
first_name = Column(String(255), index=True)
last_name = Column(String(255), index=True)
username = Column(String(255), unique=True, nullable=True)
password = Column(String(255))
active = Column(Boolean())
created_at = Column(DateTime, default=datetime.datetime.utcnow)
confirmed_at = Column(DateTime())
roles = relationship(
"Role", secondary="roles_users", backref=backref("users", lazy="dynamic")
)
class Role(db.Model, RoleMixin):
__tablename__ = "role"
id = Column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True
)
name = Column(String(80), unique=True)
description = Column(String(255))
# __hash__ is required to avoid the exception
# TypeError: unhashable type: 'Role' when saving a User
def __hash__(self):
return hash(self.name)
with app.app_context():
db.create_all()
def tear_down():
db.drop_all()
_teardown_realdb(db_info)
request.addfinalizer(tear_down)
ds = SQLAlchemyUserDatastore(db, User, Role)
app.security = Security(app, datastore=ds)
with app.app_context():
user = ds.find_user(email="matt@lp.com")
assert not user
|