1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
|
# -*- coding: utf-8 -*-
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
# more details.
#
# You should have received a copy of the GNU General Public License version 3
# along with this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# (c) 2016-205 Valentin Samir
"""Tests module for models"""
from cas_server.default_settings import settings
import django
from django.test import TestCase, Client
from django.test.utils import override_settings
from django.utils import timezone
from django.core import mail
from unittest import mock
from datetime import timedelta
from importlib import import_module
from cas_server import models, utils
from cas_server.tests.utils import get_auth_client, HttpParamsHandler
from cas_server.tests.mixin import UserModels, BaseServicePattern, FederatedIendityProviderModel
from cas_server.tests.test_federate import PROVIDERS
SessionStore = import_module(settings.SESSION_ENGINE).SessionStore
class FederatedUserTestCase(TestCase, UserModels, FederatedIendityProviderModel):
"""test for the federated user model"""
def setUp(self):
"""Prepare the test context"""
self.setup_federated_identity_provider(PROVIDERS)
def test_clean_old_entries(self):
"""tests for clean_old_entries that should delete federated user no longer used"""
client = Client()
client.get("/login")
provider = models.FederatedIendityProvider.objects.get(suffix="example.com")
models.FederatedUser.objects.create(
username="test1", provider=provider, attributs={}, ticket=""
)
models.FederatedUser.objects.create(
username="test2", provider=provider, attributs={}, ticket=""
)
models.FederatedUser.objects.all().update(
last_update=(timezone.now() - timedelta(seconds=settings.CAS_TICKET_TIMEOUT + 10))
)
models.FederatedUser.objects.create(
username="test3", provider=provider, attributs={}, ticket=""
)
models.User.objects.create(
username="test1@example.com", session_key=client.session.session_key
)
self.assertEqual(len(models.FederatedUser.objects.all()), 3)
models.FederatedUser.clean_old_entries()
self.assertEqual(len(models.FederatedUser.objects.all()), 2)
with self.assertRaises(models.FederatedUser.DoesNotExist):
models.FederatedUser.objects.get(username="test2")
def test_json_attributes(self):
"""test the json storage of ``atrributs`` in ``_attributs``"""
provider = models.FederatedIendityProvider.objects.get(suffix="example.com")
user = models.FederatedUser.objects.create(
username=settings.CAS_TEST_USER,
provider=provider,
attributs=settings.CAS_TEST_ATTRIBUTES,
ticket=""
)
self.assertEqual(utils.json_encode(settings.CAS_TEST_ATTRIBUTES), user._attributs)
user.delete()
user = models.FederatedUser.objects.create(
username=settings.CAS_TEST_USER,
provider=provider,
ticket=""
)
self.assertIsNone(user._attributs)
self.assertIsNone(user.attributs)
class FederateSLOTestCase(TestCase, UserModels):
"""test for the federated SLO model"""
def test_clean_deleted_sessions(self):
"""
tests for clean_deleted_sessions that should delete object for which matching session
do not exists anymore
"""
if django.VERSION >= (1, 8):
client1 = Client()
client2 = Client()
client1.get("/login")
client2.get("/login")
session = client2.session
session['authenticated'] = True
session.save()
models.FederateSLO.objects.create(
username="test1@example.com",
session_key=client1.session.session_key,
ticket=utils.gen_st()
)
models.FederateSLO.objects.create(
username="test2@example.com",
session_key=client2.session.session_key,
ticket=utils.gen_st()
)
self.assertEqual(len(models.FederateSLO.objects.all()), 2)
models.FederateSLO.clean_deleted_sessions()
self.assertEqual(len(models.FederateSLO.objects.all()), 1)
with self.assertRaises(models.FederateSLO.DoesNotExist):
models.FederateSLO.objects.get(username="test1@example.com")
@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
class UserAttributesTestCase(TestCase, UserModels):
"""test for the user attributes cache model"""
def test_clean_old_entries(self):
"""test the clean_old_entries methode"""
client = get_auth_client()
user = self.get_user(client)
models.UserAttributes.objects.create(username=settings.CAS_TEST_USER)
# test that attribute cache is removed for non existant users
self.assertEqual(len(models.UserAttributes.objects.all()), 1)
models.UserAttributes.clean_old_entries()
self.assertEqual(len(models.UserAttributes.objects.all()), 1)
user.delete()
models.UserAttributes.clean_old_entries()
self.assertEqual(len(models.UserAttributes.objects.all()), 0)
@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
class UserTestCase(TestCase, UserModels):
"""tests for the user models"""
def setUp(self):
"""Prepare the test context"""
self.service = 'http://127.0.0.1:45678'
self.service_pattern = models.ServicePattern.objects.create(
name="localhost",
pattern=r"^https?://127\.0\.0\.1(:[0-9]+)?(/.*)?$",
single_log_out=True
)
models.ReplaceAttributName.objects.create(name="*", service_pattern=self.service_pattern)
def test_clean_old_entries(self):
"""test clean_old_entries"""
# get an authenticated client
client = self.expire_user()
# assert the user exists before being cleaned
self.assertEqual(len(models.User.objects.all()), 1)
# assert the last activity date is before the expiry date
self.assertTrue(
self.get_user(client).date < (
timezone.now() - timedelta(seconds=settings.SESSION_COOKIE_AGE)
)
)
# delete old inactive users
models.User.clean_old_entries()
# assert the user has being well delete
self.assertEqual(len(models.User.objects.all()), 0)
@override_settings(CAS_TGT_VALIDITY=3600)
def test_clean_old_entries_tgt_expired(self):
"""test clean_old_entiers with CAS_TGT_VALIDITY set"""
# get an authenticated client
client = self.tgt_expired_user(settings.CAS_TGT_VALIDITY + 60)
# assert the user exists before being cleaned
self.assertEqual(len(models.User.objects.all()), 1)
# assert the last lofin date is before the expiry date
self.assertTrue(
self.get_user(client).last_login < (
timezone.now() - timedelta(seconds=settings.CAS_TGT_VALIDITY)
)
)
# delete old inactive users
models.User.clean_old_entries()
# assert the user has being well delete
self.assertEqual(len(models.User.objects.all()), 0)
def test_clean_deleted_sessions(self):
"""test clean_deleted_sessions"""
# get an authenticated client
client1 = get_auth_client()
client2 = get_auth_client()
# generate a ticket to fire SLO during user cleaning (SLO should fail a nothing listen
# on self.service)
ticket = self.get_user(client1).get_ticket(
models.ServiceTicket,
self.service,
self.service_pattern,
renew=False
)
ticket.validate = True
ticket.save()
# simulated expired session being garbage collected for client1
session = SessionStore(session_key=client1.session.session_key)
session.flush()
# assert the user exists before being cleaned
self.assertTrue(self.get_user(client1))
self.assertTrue(self.get_user(client2))
self.assertEqual(len(models.User.objects.all()), 2)
# session has being remove so the user of client1 is no longer authenticated
self.assertFalse(client1.session.get("authenticated"))
# the user a client2 should still be authenticated
self.assertTrue(client2.session.get("authenticated"))
# the user should be deleted
models.User.clean_deleted_sessions()
# assert the user with expired sessions has being well deleted but the other remain
self.assertEqual(len(models.User.objects.all()), 1)
self.assertFalse(models.ServiceTicket.objects.all())
self.assertTrue(client2.session.get("authenticated"))
@override_settings(CAS_AUTH_CLASS='cas_server.tests.auth.TestCachedAttributesAuthUser')
def test_cached_attributs(self):
"""
Test gettting user attributes from cache for auth method that do not support direct
fetch (link the ldap bind auth methode)
"""
client = get_auth_client()
user = self.get_user(client)
# if no cache is defined, the attributes are empty
self.assertEqual(user.attributs, {})
user_attr = models.UserAttributes.objects.create(username=settings.CAS_TEST_USER)
# if a cache is defined but without atrributes, also empty
self.assertEqual(user.attributs, {})
user_attr.attributs = settings.CAS_TEST_ATTRIBUTES
user_attr.save()
# attributes are what is found in the cache
self.assertEqual(user.attributs, settings.CAS_TEST_ATTRIBUTES)
@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
class TicketTestCase(TestCase, UserModels, BaseServicePattern):
"""tests for the tickets models"""
def setUp(self):
"""Prepare the test context"""
self.setup_service_patterns()
self.service = 'http://127.0.0.1:45678'
self.service_pattern = models.ServicePattern.objects.create(
name="localhost",
pattern=r"^https?://127\.0\.0\.1(:[0-9]+)?(/.*)?$",
single_log_out=True
)
models.ReplaceAttributName.objects.create(name="*", service_pattern=self.service_pattern)
@staticmethod
def get_ticket(
user,
ticket_class,
service,
service_pattern,
renew=False,
validate=False,
validity_expired=False,
timeout_expired=False,
single_log_out=False,
):
"""Return a ticket"""
ticket = user.get_ticket(ticket_class, service, service_pattern, renew)
ticket.validate = validate
ticket.single_log_out = single_log_out
if validity_expired:
ticket.creation = min(
ticket.creation,
(timezone.now() - timedelta(seconds=(ticket_class.VALIDITY + 10)))
)
if timeout_expired:
ticket.creation = min(
ticket.creation,
(timezone.now() - timedelta(seconds=(ticket_class.TIMEOUT + 10)))
)
ticket.save()
return ticket
def test_clean_old_service_ticket(self):
"""test tickets clean_old_entries"""
# ge an authenticated client
client = get_auth_client()
# get the user associated to the client
user = self.get_user(client)
# generate a ticket for that client, waiting for validation
self.get_ticket(user, models.ServiceTicket, self.service, self.service_pattern)
# generate another ticket for those validation time has expired
self.get_ticket(
user, models.ServiceTicket,
self.service, self.service_pattern, validity_expired=True
)
(httpd, host, port) = HttpParamsHandler.run()[0:3]
service = "http://%s:%s" % (host, port)
# generate a ticket with SLO having timeout reach
self.get_ticket(
user, models.ServiceTicket,
service, self.service_pattern, timeout_expired=True,
validate=True, single_log_out=True
)
# there should be 3 tickets in the db
self.assertEqual(len(models.ServiceTicket.objects.all()), 3)
# we call the clean_old_entries method that should delete validated non SLO ticket and
# expired non validated ticket and send SLO for SLO expired ticket before deleting then
models.ServiceTicket.clean_old_entries()
params = httpd.PARAMS
# we successfully got a SLO request
self.assertTrue(b'logoutRequest' in params and params[b'logoutRequest'])
# only 1 ticket remain in the db
self.assertEqual(len(models.ServiceTicket.objects.all()), 1)
def test_json_attributes(self):
"""test the json storage of ``atrributs`` in ``_attributs``"""
# ge an authenticated client
client = get_auth_client()
# get the user associated to the client
user = self.get_user(client)
ticket = models.ServiceTicket.objects.create(
user=user,
service=self.service,
attributs=settings.CAS_TEST_ATTRIBUTES,
service_pattern=self.service_pattern
)
self.assertEqual(utils.json_encode(settings.CAS_TEST_ATTRIBUTES), ticket._attributs)
ticket.delete()
ticket = models.ServiceTicket.objects.create(
user=user,
service=self.service,
service_pattern=self.service_pattern
)
self.assertIsNone(ticket._attributs)
self.assertIsNone(ticket.attributs)
@mock.patch("cas_server.utils.last_version", lambda: "1.2.3")
@override_settings(ADMINS=[("Ano Nymous", "ano.nymous@example.net")])
@override_settings(CAS_NEW_VERSION_EMAIL_WARNING=True)
class NewVersionWarningTestCase(TestCase):
"""tests for the new version warning model"""
@mock.patch("cas_server.models.VERSION", "0.1.2")
def test_send_mails(self):
"""test the send_mails method with ADMINS and a new version available"""
models.NewVersionWarning.send_mails()
self.assertEqual(len(mail.outbox), 1)
self.assertEqual(
mail.outbox[0].subject,
'%sA new version of django-cas-server is available' % settings.EMAIL_SUBJECT_PREFIX
)
models.NewVersionWarning.send_mails()
self.assertEqual(len(mail.outbox), 1)
@mock.patch("cas_server.models.VERSION", "1.2.3")
def test_send_mails_same_version(self):
"""test the send_mails method with with current version being the last"""
models.NewVersionWarning.objects.create(version="0.1.2")
models.NewVersionWarning.send_mails()
self.assertEqual(len(mail.outbox), 0)
@override_settings(ADMINS=[])
def test_send_mails_no_admins(self):
"""test the send_mails method without ADMINS"""
models.NewVersionWarning.send_mails()
self.assertEqual(len(mail.outbox), 0)
@override_settings(CAS_NEW_VERSION_EMAIL_WARNING=False)
def test_send_mails_disabled(self):
"""test the send_mails method if disabled"""
models.NewVersionWarning.send_mails()
self.assertEqual(len(mail.outbox), 0)
|