1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
|
#
# Copyright (C) 2025 Red Hat
# see file 'COPYING' for use and warranty information
from ipalib import api, errors
from ipapython.dn import DN
from ipatests.test_xmlrpc.xmlrpc_test import (fuzzy_set_optional_oc,
fuzzy_string)
from ipatests.test_xmlrpc.tracker.base import Tracker
from ipatests.util import assert_deepequal
class SysaccountTracker(Tracker):
"""Class for sysaccount plugin tests"""
retrieve_keys = {
'dn', 'uid', 'description', 'memberof', 'nsaccountlock', 'has_password'
}
retrieve_all_keys = retrieve_keys | {
'objectclass', 'ipauniqueid', 'privileged'
}
create_keys = retrieve_all_keys | {
'randompassword', 'userpassword'
}
create_keys = create_keys - {'has_password', 'ipauniqueid'}
update_keys = retrieve_keys - {'dn'}
find_keys = retrieve_keys - {'has_password'}
find_all_keys = retrieve_all_keys - {'has_password'}
primary_keys = {'uid', 'dn'}
def __init__(self, name, description=None, **kwargs):
super(SysaccountTracker, self).__init__(default_version=None)
self.uid = name
self.description = description
self.dn = DN(('uid', self.uid), api.env.container_sysaccounts,
api.env.basedn)
self.kwargs = kwargs
def make_create_command(self, random=True, **kwargs):
"""Make function that creates a sysaccount using sysaccount_add"""
options = dict(random=random, **self.kwargs)
options.update(kwargs)
if self.description:
options['description'] = self.description
return self.make_command('sysaccount_add', self.uid, **options)
def make_delete_command(self):
"""Make function that deletes a sysaccount using sysaccount_del"""
return self.make_command('sysaccount_del', self.uid)
def make_retrieve_command(self, all=False, raw=False):
"""Make function that retrieves a sysaccount using sysaccount_show"""
return self.make_command('sysaccount_show', self.uid, all=all)
def make_find_command(self, *args, **kwargs):
"""Make function that searches for sysaccounts using sysaccount_find"""
return self.make_command('sysaccount_find', *args, **kwargs)
def make_update_command(self, updates):
"""Make function that updates a sysaccount using sysaccount_mod"""
return self.make_command('sysaccount_mod', self.uid, **updates)
def create(self, force=False):
"""Helper function to create an entry and check the result"""
if force:
# If force is True, ensure the entry doesn't exist first
self.ensure_missing()
elif self.exists:
# If we think it exists, verify it actually exists
try:
retrieve_cmd = self.make_retrieve_command(all=True)
retrieve_cmd()
# Entry exists, don't try to create it again
return
except errors.NotFound:
# Entry doesn't actually exist, reset our state
self.exists = False
self.track_create()
command = self.make_create_command()
try:
result = command()
self.check_create(result)
except errors.DuplicateEntry:
# Entry already exists, update our state to reflect this
self.exists = True
# Retrieve the entry to update our attrs
retrieve_cmd = self.make_retrieve_command(all=True)
retrieve_result = retrieve_cmd()
# Update attrs from the retrieved entry
for key, value in retrieve_result['result'].items():
if key in self.create_keys:
if isinstance(value, tuple):
self.attrs[key] = list(value)
else:
self.attrs[key] = [value] if not isinstance(
value, list) else value
def _normalize_result_dict(self, result_dict):
"""Normalize a result dictionary to match expected format"""
normalized = {}
for key, value in result_dict.items():
if key == 'dn':
# DN is handled by assert_deepequal
normalized[key] = value
elif isinstance(value, tuple):
normalized[key] = list(value)
elif isinstance(value, bool):
# Convert boolean to list for consistency with expected format
normalized[key] = [value]
elif key == 'randompassword' and isinstance(value, str):
# randompassword comes as a string but we expect it in a list
# The Fuzzy matcher will handle the actual value
normalized[key] = [value]
else:
normalized[key] = value
return normalized
def track_create(self):
"""Updates expected state for sysaccount creation"""
self.attrs = dict(
dn=self.dn,
uid=[self.uid],
objectclass=fuzzy_set_optional_oc(
['top', 'account', 'simplesecurityobject'],
'nsmemberof'),
nsaccountlock=[False],
privileged=[False],
randompassword=[fuzzy_string],
)
if self.description:
self.attrs['description'] = [self.description]
self.exists = True
def check_create(self, result):
"""Checks 'sysaccount_add' command result"""
# Filter out messages if present (SystemAccountUsage message)
result_copy = dict(result)
if 'messages' in result_copy:
del result_copy['messages']
# Filter the actual result to only include keys we expect
expected_result = dict(
value=self.uid,
summary=u'Added system account "%s"' % self.uid,
result=self.filter_attrs(self.create_keys)
)
# Normalize and filter the actual result
actual_result = result_copy.get('result', {})
filtered_result = {
'value': result_copy.get('value'),
'summary': result_copy.get('summary'),
'result': self._normalize_result_dict({
k: v for k, v in actual_result.items()
if k in self.create_keys
})
}
assert_deepequal(expected_result, filtered_result)
def check_delete(self, result):
"""Checks 'sysaccount_del' command result"""
assert_deepequal(dict(
value=[self.uid],
summary=u'Deleted system account "%s"' % self.uid,
result=dict(failed=[])
), result)
def check_retrieve(self, result, all=False, raw=False):
"""Checks 'sysaccount_show' command result"""
if all:
expected_keys = self.retrieve_all_keys
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected_keys = self.retrieve_keys
expected = self.filter_attrs(self.retrieve_keys)
# Filter the actual result to only include keys we expect
filtered_result = {
'value': result.get('value'),
'summary': result.get('summary'),
'result': {
k: v for k, v in result.get('result', {}).items()
if k in expected_keys
}
}
assert_deepequal(dict(
value=self.uid,
summary=None,
result=expected
), filtered_result)
def check_find(self, result, all=False, raw=False):
"""Checks 'sysaccount_find' command result"""
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
# Find can return multiple results, so we check if our entry is in list
found = False
for entry in result['result']:
uid_value = entry.get('uid')
# Handle both tuple and list formats
if isinstance(uid_value, tuple):
uid_value = list(uid_value)
if uid_value == [self.uid]:
found = True
assert_deepequal(expected, entry)
break
assert found, f"Sysaccount {self.uid} not found in result"
def check_update(self, result, extra_keys={}):
"""Checks 'sysaccount_mod' command result"""
assert_deepequal(dict(
value=self.uid,
summary=u'Modified system account "%s"' % self.uid,
result=self.filter_attrs(self.update_keys | set(extra_keys))
), result)
|