File: credentials.py

package info (click to toggle)
python-tempestconf 3.5.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 964 kB
  • sloc: python: 4,530; makefile: 18; sh: 9
file content (174 lines) | stat: -rw-r--r-- 6,872 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# Copyright 2016, 2018 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import requests
from tempest.lib import auth

from config_tempest import utils


class Credentials(object):
    """Class contains all needed credentials.

    Wrapps credentials obtained from TempestConf object and Tempest
    credentialsfrom auth library.
    """
    def __init__(self, conf, admin, **kwargs):
        """Init method of Credentials.

        :type conf: TempestConf object
        :param admin: True if the user is admin, False otherwise
        :type admin: Boolean
        """
        self.admin = admin
        self._conf = conf
        self.verify = kwargs.get('verify', True)
        self.cert = kwargs.get('cert', None)
        self.username = self.get_credential('username')
        self.password = self.get_credential('password')
        self.project_name = self.get_credential('project_name')
        self.domain_name = self.get_credential('domain_name')
        self.user_domain_name = self.get_credential('user_domain_name')
        self.project_domain_name = self.get_credential('project_domain_name')
        self.identity_version = self._get_identity_version()
        self.api_version = 3 if self.identity_version == "v3" else 2
        self.identity_region = self._conf.get_defaulted('identity', 'region')
        self.set_ssl_certificate_validation()
        self.set_credentials()

    def set_ssl_certificate_validation(self):
        # is there a specific CA bundle to use?
        # self.verify is either a boolean, in which case it controls whether
        # server's TLS certificates are verified, or a string, in which case
        # it is a path to a CA bundle to use, default in requests package
        # is True.
        if isinstance(self.verify, str):
            self.disable_ssl_certificate_validation = False
            self.ca_certs = self.verify
            self._conf.set('identity', 'ca_certificates_file', self.ca_certs)
        else:
            self.disable_ssl_certificate_validation = self._conf.get_defaulted(
                'identity', 'disable_ssl_certificate_validation'
            )
            self.ca_certs = self._conf.get_defaulted('identity',
                                                     'ca_certificates_file')
        self._conf.set('identity', 'disable_ssl_certificate_validation',
                       str(self.disable_ssl_certificate_validation))

    def get_ssl_certificate_validation(self):
        return {
            'disable_ssl_certificate_validation':
                self.disable_ssl_certificate_validation,
            'ca_certs': self.ca_certs,
        }

    def get_credential(self, key):
        """Helper for getting credential by its name.

        :param key: credential name
        :type key: string
        :returns: credential
        :rtype: string
        """
        if self.admin:
            # admin credentials are stored in auth section
            # and are prefixed by 'admin_'
            return self._conf.get_defaulted('auth', 'admin_' + key)
        else:
            # Tempest doesn't have non admin credentials, but the
            # tool keeps them in identity section for further usage
            return self._conf.get_defaulted('identity', key)

    def _list_versions(self, base_url, **kwargs):
        resp = requests.get(base_url, **kwargs)
        data = resp.json()
        return data["versions"]["values"]

    def _get_identity_version(self):
        """Looks for identity version in TempestConf object.

        :returns: identity version
        :rtype: string
        """
        base_url = utils.get_base_url(self._conf.get("identity", "uri"))
        kwargs = {
            'verify': self.verify,
            'cert': self.cert,
        }
        versions = self._list_versions(base_url, **kwargs)
        for version in versions:
            if version["status"] == "stable" and "v3" in version["id"]:
                return "v3"
        return "v2"

    def _get_creds_kwargs(self):
        """Creates kwargs.

        Kwargs based on the identity version, for obtaining
        Tempest credentials.

        :returns: kwargs
        :rtype: dict
        """
        creds_kwargs = {'username': self.username,
                        'password': self.password}
        if self.identity_version == 'v3':
            creds_kwargs.update(
                {
                    'project_name': self.project_name,
                    'domain_name': self.domain_name,
                    'project_domain_name': self.project_domain_name,
                    'user_domain_name': self.user_domain_name
                }
            )
        else:
            creds_kwargs.update({'project_name': self.project_name})
        return creds_kwargs

    def set_credentials(self):
        """Gets and saves Tempest credentials from auth library."""
        creds_kwargs = self._get_creds_kwargs()
        disable_ssl = self.disable_ssl_certificate_validation
        self.tempest_creds = auth.get_credentials(
            auth_url=None,
            fill_in=False,
            identity_version=self.identity_version,
            disable_ssl_certificate_validation=disable_ssl,
            ca_certs=self.ca_certs,
            **creds_kwargs)

    def get_auth_provider(self):
        """Gets auth provider based on the type of Tempest credentials.

        :returns: auth provider
        :rtype: auth.KeystoneV2AuthProvider/auth.KeystoneV3AuthProvider
        """
        if isinstance(self.tempest_creds, auth.KeystoneV3Credentials):
            # We set uri and uri_v3 to /v3 here because if the endpoint on the
            # rc file don't set the /v3 it will fail with a error 404
            uri = self._conf.get_defaulted('identity', 'uri_v3')
            uri = utils.get_base_url(uri) + 'v3'
            self._conf.set('identity', 'uri_v3', uri)
            return auth.KeystoneV3AuthProvider(
                self.tempest_creds,
                self._conf.get_defaulted('identity', 'uri_v3'),
                self.disable_ssl_certificate_validation,
                self.ca_certs)
        else:
            return auth.KeystoneV2AuthProvider(
                self.tempest_creds,
                self._conf.get_defaulted('identity', 'uri'),
                self.disable_ssl_certificate_validation,
                self.ca_certs)