File: ca_plugin.py

package info (click to toggle)
freeipa 4.13.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 367,240 kB
  • sloc: javascript: 562,763; python: 310,289; ansic: 49,809; sh: 7,176; makefile: 2,589; xml: 343; sed: 16
file content (170 lines) | stat: -rw-r--r-- 5,708 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#
# Copyright (C) 2016  FreeIPA Contributors see COPYING for license
#
from __future__ import absolute_import

import six
from lib389.utils import get_default_db_lib

from ipapython.dn import DN
from ipatests.test_xmlrpc.tracker.base import Tracker, EnableTracker
from ipatests.util import assert_deepequal
from ipatests.test_xmlrpc.xmlrpc_test import (
    fuzzy_issuer,
    fuzzy_caid,
    fuzzy_base64,
    fuzzy_sequence_of,
    fuzzy_bytes,
)
from ipatests.test_xmlrpc import objectclasses


if six.PY3:
    unicode = str


class CATracker(Tracker, EnableTracker):
    """Implementation of a Tracker class for CA plugin."""

    ldap_keys = {
        'dn', 'cn', 'ipacaid', 'ipacasubjectdn', 'ipacaissuerdn',
        'description', 'ipacarandomserialnumberversion',
    }
    cert_keys = {
        'certificate',
    }
    cert_all_keys = {
        'certificate_chain',
    }
    find_keys = ldap_keys
    find_all_keys = {'objectclass'} | ldap_keys
    retrieve_keys = ldap_keys | cert_keys
    retrieve_all_keys = {'objectclass'} | retrieve_keys | cert_all_keys
    create_keys = {'objectclass'} | retrieve_keys
    update_keys = ldap_keys - {'dn'}

    def __init__(self, name, subject, desc=u"Test generated CA",
                 default_version=None, auto_disable_for_delete=True):
        super(CATracker, self).__init__(default_version=default_version)
        self.attrs = {}
        self.ipasubjectdn = subject
        self.description = desc

        self.dn = DN(('cn', name),
                     self.api.env.container_ca,
                     self.api.env.basedn)

        # Whether to run ca-disable automatically before deleting the CA.
        self.auto_disable_for_delete = auto_disable_for_delete

    def make_create_command(self):
        """Make function that creates the plugin entry object."""
        return self.make_command(
            'ca_add', self.name, ipacasubjectdn=self.ipasubjectdn,
            description=self.description
        )

    def check_create(self, result):
        assert_deepequal(dict(
            value=self.name,
            summary=u'Created CA "{}"'.format(self.name),
            result=dict(self.filter_attrs(self.create_keys))
        ), result)

    def track_create(self):
        self.attrs = dict(
            dn=unicode(self.dn),
            cn=[self.name],
            description=[self.description],
            ipacasubjectdn=[self.ipasubjectdn],
            ipacaissuerdn=[fuzzy_issuer],
            ipacaid=[fuzzy_caid],
            certificate=fuzzy_base64,
            certificate_chain=fuzzy_sequence_of(fuzzy_bytes),
            objectclass=objectclasses.ca
        )
        if self.description == 'IPA CA':
            if get_default_db_lib() == 'bdb':
                self.attrs['ipacarandomserialnumberversion'] = ('0',)
            else:
                self.attrs['ipacarandomserialnumberversion'] = ('3',)
        self.exists = True

    def make_disable_command(self):
        return self.make_command('ca_disable', self.name)

    def check_disable(self, result):
        assert_deepequal(dict(
            result=True,
            value=self.name,
            summary=f'Disabled CA "{self.name}"',
        ), result)

    def make_delete_command(self):
        """Make function that deletes the plugin entry object."""
        if self.auto_disable_for_delete:
            def disable_then_delete():
                self.make_command('ca_disable', self.name)()
                return self.make_command('ca_del', self.name)()
            return disable_then_delete
        else:
            return self.make_command('ca_del', self.name)

    def check_delete(self, result):
        assert_deepequal(dict(
            value=[self.name],
            summary=u'Deleted CA "{}"'.format(self.name),
            result=dict(failed=[])
        ), result)

    def make_retrieve_command(self, all=False, raw=False, **options):
        """Make function that retrieves the entry using ${CMD}_show"""
        return self.make_command('ca_show', self.name, all=all, raw=raw,
                                 **options)

    def check_retrieve(self, result, all=False, raw=False):
        """Check the plugin's `show` command result"""
        if all:
            expected = self.filter_attrs(self.retrieve_all_keys)
        else:
            expected = self.filter_attrs(self.retrieve_keys)

        assert_deepequal(dict(
            value=self.name,
            summary=None,
            result=expected
        ), result)

    def make_find_command(self, *args, **kwargs):
        """Make function that finds the entry using ${CMD}_find

        Note that the name (or other search terms) needs to be specified
        in arguments.
        """
        return self.make_command('ca_find', *args, **kwargs)

    def check_find(self, result, all=False, raw=False):
        """Check the plugin's `find` command result"""
        if all:
            expected = self.filter_attrs(self.find_all_keys)
        else:
            expected = self.filter_attrs(self.find_keys)

        assert_deepequal(dict(
            count=1,
            truncated=False,
            summary=u'1 CA matched',
            result=[expected]
        ), result)

    def make_update_command(self, updates):
        """Make function that modifies the entry using ${CMD}_mod"""
        return self.make_command('ca_mod', self.name, **updates)

    def check_update(self, result, extra_keys=()):
        """Check the plugin's `find` command result"""
        assert_deepequal(dict(
            value=self.name,
            summary=u'Modified CA "{}"'.format(self.name),
            result=self.filter_attrs(self.update_keys | set(extra_keys))
        ), result)