File: test_coe_clusters_certificate.py

package info (click to toggle)
python-openstacksdk 4.4.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 13,352 kB
  • sloc: python: 122,960; sh: 153; makefile: 23
file content (99 lines) | stat: -rw-r--r-- 3,247 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.


from openstack.container_infrastructure_management.v1 import (
    cluster_certificate,
)
from openstack.tests.unit import base

coe_cluster_ca_obj = dict(
    cluster_uuid="43e305ce-3a5f-412a-8a14-087834c34c8c",
    pem="-----BEGIN CERTIFICATE-----\nMIIDAO\n-----END CERTIFICATE-----\n",
    bay_uuid="43e305ce-3a5f-412a-8a14-087834c34c8c",
    links=[],
)

coe_cluster_signed_cert_obj = dict(
    cluster_uuid='43e305ce-3a5f-412a-8a14-087834c34c8c',
    pem='-----BEGIN CERTIFICATE-----\nMIIDAO\n-----END CERTIFICATE-----',
    bay_uuid='43e305ce-3a5f-412a-8a14-087834c34c8c',
    links=[],
    csr=(
        '-----BEGIN CERTIFICATE REQUEST-----\nMIICfz=='
        '\n-----END CERTIFICATE REQUEST-----\n'
    ),
)


class TestCOEClusters(base.TestCase):
    def _compare_cluster_certs(self, exp, real):
        self.assertDictEqual(
            cluster_certificate.ClusterCertificate(**exp).to_dict(
                computed=False
            ),
            real.to_dict(computed=False),
        )

    def get_mock_url(
        self,
        service_type='container-infrastructure-management',
        base_url_append=None,
        append=None,
        resource=None,
    ):
        return super().get_mock_url(
            service_type=service_type,
            resource=resource,
            append=append,
            base_url_append=base_url_append,
        )

    def test_get_coe_cluster_certificate(self):
        self.register_uris(
            [
                dict(
                    method='GET',
                    uri=self.get_mock_url(
                        resource='certificates',
                        append=[coe_cluster_ca_obj['cluster_uuid']],
                    ),
                    json=coe_cluster_ca_obj,
                )
            ]
        )
        ca_cert = self.cloud.get_coe_cluster_certificate(
            coe_cluster_ca_obj['cluster_uuid']
        )
        self._compare_cluster_certs(coe_cluster_ca_obj, ca_cert)
        self.assert_calls()

    def test_sign_coe_cluster_certificate(self):
        self.register_uris(
            [
                dict(
                    method='POST',
                    uri=self.get_mock_url(resource='certificates'),
                    json={
                        "cluster_uuid": coe_cluster_signed_cert_obj[
                            'cluster_uuid'
                        ],
                        "csr": coe_cluster_signed_cert_obj['csr'],
                    },
                )
            ]
        )
        self.cloud.sign_coe_cluster_certificate(
            coe_cluster_signed_cert_obj['cluster_uuid'],
            coe_cluster_signed_cert_obj['csr'],
        )
        self.assert_calls()