1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
|
#
# Copyright (C) 2016 FreeIPA Contributors see COPYING for license
#
from __future__ import absolute_import
import six
from lib389.utils import get_default_db_lib
from ipapython.dn import DN
from ipatests.test_xmlrpc.tracker.base import Tracker, EnableTracker
from ipatests.util import assert_deepequal
from ipatests.test_xmlrpc.xmlrpc_test import (
fuzzy_issuer,
fuzzy_caid,
fuzzy_base64,
fuzzy_sequence_of,
fuzzy_bytes,
)
from ipatests.test_xmlrpc import objectclasses
if six.PY3:
unicode = str
class CATracker(Tracker, EnableTracker):
"""Implementation of a Tracker class for CA plugin."""
ldap_keys = {
'dn', 'cn', 'ipacaid', 'ipacasubjectdn', 'ipacaissuerdn',
'description', 'ipacarandomserialnumberversion',
}
cert_keys = {
'certificate',
}
cert_all_keys = {
'certificate_chain',
}
find_keys = ldap_keys
find_all_keys = {'objectclass'} | ldap_keys
retrieve_keys = ldap_keys | cert_keys
retrieve_all_keys = {'objectclass'} | retrieve_keys | cert_all_keys
create_keys = {'objectclass'} | retrieve_keys
update_keys = ldap_keys - {'dn'}
def __init__(self, name, subject, desc=u"Test generated CA",
default_version=None, auto_disable_for_delete=True):
super(CATracker, self).__init__(default_version=default_version)
self.attrs = {}
self.ipasubjectdn = subject
self.description = desc
self.dn = DN(('cn', name),
self.api.env.container_ca,
self.api.env.basedn)
# Whether to run ca-disable automatically before deleting the CA.
self.auto_disable_for_delete = auto_disable_for_delete
def make_create_command(self):
"""Make function that creates the plugin entry object."""
return self.make_command(
'ca_add', self.name, ipacasubjectdn=self.ipasubjectdn,
description=self.description
)
def check_create(self, result):
assert_deepequal(dict(
value=self.name,
summary=u'Created CA "{}"'.format(self.name),
result=dict(self.filter_attrs(self.create_keys))
), result)
def track_create(self):
self.attrs = dict(
dn=unicode(self.dn),
cn=[self.name],
description=[self.description],
ipacasubjectdn=[self.ipasubjectdn],
ipacaissuerdn=[fuzzy_issuer],
ipacaid=[fuzzy_caid],
certificate=fuzzy_base64,
certificate_chain=fuzzy_sequence_of(fuzzy_bytes),
objectclass=objectclasses.ca
)
if self.description == 'IPA CA':
if get_default_db_lib() == 'bdb':
self.attrs['ipacarandomserialnumberversion'] = ('0',)
else:
self.attrs['ipacarandomserialnumberversion'] = ('3',)
self.exists = True
def make_disable_command(self):
return self.make_command('ca_disable', self.name)
def check_disable(self, result):
assert_deepequal(dict(
result=True,
value=self.name,
summary=f'Disabled CA "{self.name}"',
), result)
def make_delete_command(self):
"""Make function that deletes the plugin entry object."""
if self.auto_disable_for_delete:
def disable_then_delete():
self.make_command('ca_disable', self.name)()
return self.make_command('ca_del', self.name)()
return disable_then_delete
else:
return self.make_command('ca_del', self.name)
def check_delete(self, result):
assert_deepequal(dict(
value=[self.name],
summary=u'Deleted CA "{}"'.format(self.name),
result=dict(failed=[])
), result)
def make_retrieve_command(self, all=False, raw=False, **options):
"""Make function that retrieves the entry using ${CMD}_show"""
return self.make_command('ca_show', self.name, all=all, raw=raw,
**options)
def check_retrieve(self, result, all=False, raw=False):
"""Check the plugin's `show` command result"""
if all:
expected = self.filter_attrs(self.retrieve_all_keys)
else:
expected = self.filter_attrs(self.retrieve_keys)
assert_deepequal(dict(
value=self.name,
summary=None,
result=expected
), result)
def make_find_command(self, *args, **kwargs):
"""Make function that finds the entry using ${CMD}_find
Note that the name (or other search terms) needs to be specified
in arguments.
"""
return self.make_command('ca_find', *args, **kwargs)
def check_find(self, result, all=False, raw=False):
"""Check the plugin's `find` command result"""
if all:
expected = self.filter_attrs(self.find_all_keys)
else:
expected = self.filter_attrs(self.find_keys)
assert_deepequal(dict(
count=1,
truncated=False,
summary=u'1 CA matched',
result=[expected]
), result)
def make_update_command(self, updates):
"""Make function that modifies the entry using ${CMD}_mod"""
return self.make_command('ca_mod', self.name, **updates)
def check_update(self, result, extra_keys=()):
"""Check the plugin's `find` command result"""
assert_deepequal(dict(
value=self.name,
summary=u'Modified CA "{}"'.format(self.name),
result=self.filter_attrs(self.update_keys | set(extra_keys))
), result)
|