1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
#
# Copyright (C) 2021 FreeIPA Contributors see COPYING for license
#
"""
Test the LDAPCache class.
"""
# pylint: disable=no-member
from ipalib import api, errors
from ipapython import ipaldap
from ipapython.dn import DN
import pytest
def hits_and_misses(cache, hits, misses):
assert cache._cache_hits == hits
assert cache._cache_misses == misses
@pytest.fixture(scope='class')
@pytest.mark.tier1
@pytest.mark.needs_ipaapi
def class_cache(request):
cache = ipaldap.LDAPCache(api.env.ldap_uri)
hits_and_misses(cache, 0, 0)
request.cls.cache = cache
request.cls.userdn = DN(
'uid=testuser', api.env.container_user, api.env.basedn
)
rpcclient = api.Backend.rpcclient
was_connected = rpcclient.isconnected()
if not was_connected:
rpcclient.connect()
api.Command.user_add('testuser', givenname=u'Test', sn=u'User')
yield
try:
api.Command.user_del('testuser')
except Exception:
pass
try:
if not was_connected:
rpcclient.disconnect()
except Exception:
pass
@pytest.mark.usefixtures('class_cache')
@pytest.mark.skip_ipaclient_unittest
@pytest.mark.needs_ipaapi
class TestLDAPCache:
def test_one(self):
dn = DN('uid=notfound', api.env.container_user, api.env.basedn)
try:
self.cache.get_entry(dn)
except errors.EmptyResult:
pass
assert dn in self.cache.cache
exc = self.cache.cache[dn].exception
assert isinstance(exc, errors.EmptyResult)
hits_and_misses(self.cache, 0, 1)
def test_retrieve_exception(self):
dn = DN('uid=notfound', api.env.container_user, api.env.basedn)
try:
self.cache.get_entry(dn)
except errors.EmptyResult:
pass
assert dn in self.cache.cache
exc = self.cache.cache[dn].exception
assert isinstance(exc, errors.EmptyResult)
hits_and_misses(self.cache, 1, 1)
def test_get_testuser(self):
assert self.userdn not in self.cache.cache
self.cache.get_entry(self.userdn)
assert self.userdn in self.cache.cache
hits_and_misses(self.cache, 1, 2)
def test_get_testuser_again(self):
assert self.userdn in self.cache.cache
# get the user again with with no attributes requested (so all)
self.cache.get_entry(self.userdn)
hits_and_misses(self.cache, 2, 2)
# Now get the user with a subset of cached attributes
entry = self.cache.get_entry(self.userdn, ('givenname', 'sn', 'cn'))
# Make sure we only got three attributes, as requested
assert len(entry.items()) == 3
hits_and_misses(self.cache, 3, 2)
def test_update_testuser(self):
entry = self.cache.cache[self.userdn].entry
try:
self.cache.update_entry(entry)
except errors.EmptyModlist:
pass
assert self.userdn not in self.cache.cache
hits_and_misses(self.cache, 3, 2)
def test_modify_testuser(self):
self.cache.get_entry(self.userdn)
entry = self.cache.cache[self.userdn].entry
try:
self.cache.modify_s(entry.dn, [])
except errors.EmptyModlist:
pass
assert self.userdn not in self.cache.cache
hits_and_misses(self.cache, 3, 3)
def test_delete_entry(self):
# We don't care if this is successful or not, just that the
# cache doesn't retain the deleted entry
try:
self.cache.delete_entry(self.userdn)
except Exception:
pass
assert self.userdn not in self.cache.cache
hits_and_misses(self.cache, 3, 3)
def test_add_entry(self):
# We don't care if this is successful or not, just that the
# cache doesn't get the added entry
try:
self.cache.add_entry(self.userdn)
except Exception:
pass
assert self.userdn not in self.cache.cache
hits_and_misses(self.cache, 3, 3)
def test_clear_cache(self):
self.cache.clear_cache()
hits_and_misses(self.cache, 0, 0)
|