File: https_cert.py

package info (click to toggle)
python-proliantutils 2.16.3-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,684 kB
  • sloc: python: 58,655; makefile: 163; sh: 2
file content (132 lines) | stat: -rw-r--r-- 4,371 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Copyright 2020 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

__author__ = 'HPE'

import sushy
from sushy.resources import base
from sushy.resources import common as sushy_common

from proliantutils import exception
from proliantutils.ilo import common
from proliantutils import log

LOG = log.get_logger(__name__)


class ActionsField(base.CompositeField):
    generate_csr = (sushy_common.
                    ResetActionField('#HpeHttpsCert.GenerateCSR'))
    import_cert = (sushy_common.
                   ResetActionField('#HpeHttpsCert.ImportCertificate'))


class HttpsCert(base.ResourceBase):

    cert_sign_request = base.Field('CertificateSigningRequest', required=True)
    _actions = ActionsField(['Actions'], required=True)

    def _get_https_cert_uri(self, cert_func):
        """Get the url for generating CSR

        :returns: generate csr url
        :raises: Missing resource error on missing url
        """
        url = self._actions[cert_func]
        if not url:
            if cert_func == 'generate_csr':
                missing_action = '#HpeHttpsCert.GenerateCSR'
            else:
                missing_action = '#HpeHttpsCert.ImportCertificate'
            raise (sushy.exceptions.
                   MissingActionError(action=missing_action,
                                      resource=self._path))
        return url

    def import_certificate(self, cert):
        """Adds the signed https certificate to the iLO.

        :param certificate: Signed HTTPS certificate.
        :raises: IloError, on an error from iLO.
        """
        action_data = {
            "Certificate": cert
        }

        target_uri = self._get_https_cert_uri('import_cert').target_uri
        try:
            self._conn.post(target_uri, data=action_data)
        except sushy.exceptions.SushyError as e:
            msg = (('The Redfish controller failed to import '
                    'certificate. Error: %(error)s') %
                   {'error': str(e)})
            LOG.debug(msg)  # noqa
            raise exception.IloError(msg)

    def generate_csr(self, csr_params):
        """Generates the certificate signing request.

        :param csr_params: A dictionary containing all the necessary
               information required to create CSR.
        :returns: Created CSR.
        :raises: IloError, on an error from iLO.
        """
        target_uri = self._get_https_cert_uri('generate_csr').target_uri
        try:
            self._conn.post(target_uri, data=csr_params)
        except sushy.exceptions.SushyError as e:
            msg = (('The Redfish controller failed to generate CSR '
                    'with given information. Error: %(error)s') %
                   {'error': str(e)})
            LOG.debug(msg)  # noqa
            raise exception.IloError(msg)

        self.wait_for_csr_to_create()

        csr = self.cert_sign_request

        return csr

    def wait_for_csr_to_create(self):
        """Continuosly polls for CSR to create.

        """

        def has_csr_created():
            """Check whether CSR is created.

            :returns: True upon successful creation of CSR otherwise False.
            """
            self.get_generate_csr_refresh()

            csr = self.cert_sign_request

            if not csr:
                return False
            return True

        common.wait_for_operation_to_complete(
            has_csr_created, delay_bw_retries=30,
            failover_msg='Generating CSR has failed.'
        )

    def get_generate_csr_refresh(self):
        # perform refresh
        try:
            self.refresh()
        except sushy.exceptions.SushyError as e:
            msg = (('Generating CSR progress not known. '
                    'Error: %(error)s') %
                   {'error': str(e)})
            LOG.debug(msg)