File: base.py

package info (click to toggle)
horizon 3%3A25.5.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 33,016 kB
  • sloc: python: 100,186; javascript: 49,520; sh: 440; makefile: 79
file content (289 lines) | stat: -rw-r--r-- 12,241 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import abc
import logging
import re

from django.utils.translation import gettext_lazy as _
from keystoneauth1 import exceptions as keystone_exceptions
from keystoneclient.v3 import client as v3_client

from openstack_auth import exceptions
from openstack_auth import utils

LOG = logging.getLogger(__name__)
__all__ = ['BasePlugin']


class BasePlugin(object, metaclass=abc.ABCMeta):
    """Base plugin to provide ways to log in to dashboard.

    Provides a framework for keystoneclient plugins that can be used with the
    information provided to return an unscoped token.
    """

    @abc.abstractmethod
    def get_plugin(self, auth_url=None, **kwargs):
        """Create a new plugin to attempt to authenticate.

        Given the information provided by the login providers attempt to create
        an authentication plugin that can be used to authenticate the user.

        If the provided login information does not contain enough information
        for this plugin to proceed then it should return None.

        :param str auth_url: The URL to authenticate against.

        :returns: A plugin that will be used to authenticate or None if the
                  plugin cannot authenticate with the data provided.
        :rtype: keystoneclient.auth.BaseAuthPlugin
        """
        return None

    @property
    def keystone_version(self):
        """The Identity API version as specified in the settings file."""
        return utils.get_keystone_version()

    def list_projects(self, session, auth_plugin, auth_ref=None):
        """List the projects that are accessible to this plugin.

        Query the keystone server for all projects that this authentication
        token can be rescoped to.

        This function is overrideable by plugins if they use a non-standard
        mechanism to determine projects.

        :param session: A session object for communication:
        :type session: keystoneclient.session.Session
        :param auth_plugin: The auth plugin returned by :py:meth:`get_plugin`.
        :type auth_plugin: keystoneclient.auth.BaseAuthPlugin
        :param auth_ref: The current authentication data. This is optional as
                         future auth plugins may not have auth_ref data and all
                         the required information should be available via the
                         auth_plugin.
        :type auth_ref: keystoneclient.access.AccessInfo` or None.

        :raises: exceptions.KeystoneAuthException on lookup failure.

        :returns: A list of projects. This currently accepts returning both v2
                  or v3 keystoneclient projects objects.
        """
        try:
            client = v3_client.Client(session=session, auth=auth_plugin)
            if auth_ref.is_federated:
                return client.federation.projects.list()
            return client.projects.list(user=auth_ref.user_id)

        except (keystone_exceptions.ClientException,
                keystone_exceptions.AuthorizationFailure):
            msg = _('Unable to retrieve authorized projects.')
            raise exceptions.KeystoneRetrieveProjectsException(msg)

    def list_domains(self, session, auth_plugin, auth_ref=None):
        try:
            client = v3_client.Client(session=session, auth=auth_plugin)
            return client.auth.domains()
        except (keystone_exceptions.ClientException,
                keystone_exceptions.AuthorizationFailure):
            msg = _('Unable to retrieve authorized domains.')
            raise exceptions.KeystoneRetrieveDomainsException(msg)

    def get_access_info(self, keystone_auth, session=None):
        """Get the access info from an unscoped auth

        This function provides the base functionality that the
        plugins will use to authenticate and get the access info object.

        :param keystone_auth: keystoneauth1 identity plugin
        :param session: keystoneauth1 session to use otherwise gets one
        :raises: exceptions.KeystoneAuthException on auth failure
        :returns: keystoneclient.access.AccessInfo
        """
        if session is None:
            session = utils.get_session()

        try:
            unscoped_auth_ref = keystone_auth.get_access(session)
        except keystone_exceptions.ConnectFailure as exc:
            LOG.error(str(exc))
            msg = _('Unable to establish connection to keystone endpoint.')
            raise exceptions.KeystoneConnectionException(msg)
        except (keystone_exceptions.Unauthorized,
                keystone_exceptions.Forbidden,
                keystone_exceptions.NotFound) as exc:
            msg = str(exc)
            LOG.debug(msg)
            match = re.match(r"The password is expired and needs to be changed"
                             r" for user: ([^.]*)[.].*", msg)
            if match:
                exc = exceptions.KeystonePassExpiredException(
                    _('Password expired.'))
                exc.user_id = match.group(1)
                raise exc
            msg = _('Invalid credentials.')
            raise exceptions.KeystoneCredentialsException(msg)
        except (keystone_exceptions.MissingAuthMethods) as exc:
            msg = str(exc)
            LOG.debug(msg)
            try:
                for required in exc.required_auth_methods:
                    if (len(required) == 2 and
                            'totp' in required and
                            'password' in required):
                        receipt = exc.receipt
                        msg = _('Authentication via TOTP is required.')
                        exc = exceptions.KeystoneTOTPRequired(msg)
                        exc.receipt = receipt
                        raise exc
                msg = _("An error occurred authenticating. "
                        "Please try again later.")
                raise exceptions.KeystoneAuthException(msg)
            except Exception as exc:
                raise exc
        except (keystone_exceptions.ClientException,
                keystone_exceptions.AuthorizationFailure) as exc:
            msg = _("An error occurred authenticating. "
                    "Please try again later.")
            LOG.debug(str(exc))
            raise exceptions.KeystoneAuthException(msg)
        return unscoped_auth_ref

    def get_project_scoped_auth(self, unscoped_auth, unscoped_auth_ref,
                                recent_project=None, session=None):
        """Get the project scoped keystone auth and access info

        This function returns a project scoped keystone token plugin
        and AccessInfo object.

        :param unscoped_auth: keystone auth plugin
        :param unscoped_auth_ref: keystoneclient.access.AccessInfo` or None.
        :param recent_project: project that we should try to scope to
        :param session: keystoneauth1 session to use otherwise gets one
        :return: keystone token auth plugin, AccessInfo object
        """
        if session is None:
            session = utils.get_session()

        auth_url = unscoped_auth.auth_url

        projects = self.list_projects(
            session, unscoped_auth, unscoped_auth_ref)
        # Attempt to scope only to enabled projects
        projects = [project for project in projects if project.enabled]

        # if a most recent project was found, try using it first
        if recent_project:
            for pos, project in enumerate(projects):
                if project.id == recent_project:
                    # move recent project to the beginning
                    projects.pop(pos)
                    projects.insert(0, project)
                    break

        scoped_auth = None
        scoped_auth_ref = None
        for project in projects:
            token = unscoped_auth_ref.auth_token
            scoped_auth = utils.get_token_auth_plugin(auth_url,
                                                      token=token,
                                                      project_id=project.id)
            try:
                scoped_auth_ref = scoped_auth.get_access(session)
            except (keystone_exceptions.ClientException,
                    keystone_exceptions.AuthorizationFailure):
                LOG.info('Attempted scope to project %s failed, will attempt '
                         'to scope to another project.', project.name)
            else:
                break

        return scoped_auth, scoped_auth_ref

    def get_domain_scoped_auth(self, unscoped_auth, unscoped_auth_ref,
                               domain_name=None, session=None):
        """Get the domain scoped keystone auth and access info

        This function returns a domain scoped keystone token plugin
        and AccessInfo object.

        :param unscoped_auth: keystone auth plugin
        :param unscoped_auth_ref: keystoneclient.access.AccessInfo` or None.
        :param domain_name: domain that we should try to scope to
        :param session: keystoneauth1 session to use otherwise gets one
        :return: keystone token auth plugin, AccessInfo object
        """
        if session is None:
            session = utils.get_session()

        auth_url = unscoped_auth.auth_url

        if domain_name:
            domains = [domain_name]
        else:
            domains = self.list_domains(session,
                                        unscoped_auth,
                                        unscoped_auth_ref)
            domains = [domain.name for domain in domains if domain.enabled]

        # domain support can require domain scoped tokens to perform
        # identity operations depending on the policy files being used
        # for keystone.
        domain_auth = None
        domain_auth_ref = None
        for _name in domains:
            token = unscoped_auth_ref.auth_token
            domain_auth = utils.get_token_auth_plugin(
                auth_url,
                token,
                domain_name=_name)
            try:
                domain_auth_ref = domain_auth.get_access(session)
            except (keystone_exceptions.ClientException,
                    keystone_exceptions.AuthorizationFailure):
                LOG.info('Attempted scope to domain %s failed, will attempt '
                         'to scope to another domain.', _name)
            else:
                if len(domains) > 1:
                    LOG.info("More than one valid domain found for user %s,"
                             " scoping to %s",
                             unscoped_auth_ref.user_id, _name)
                break
        return domain_auth, domain_auth_ref

    def get_system_scoped_auth(self, unscoped_auth, unscoped_auth_ref,
                               system_scope, session=None):
        """Get the system scoped keystone auth and access info

        This function returns a system scoped keystone token plugin
        and AccessInfo object.

        :param unscoped_auth: keystone auth plugin
        :param unscoped_auth_ref: keystoneclient.access.AccessInfo` or None.
        :param system_scope: system that we should try to scope to
        :param session: keystoneauth1 session to use otherwise gets one
        :return: keystone token auth plugin, AccessInfo object
        """
        if session is None:
            session = utils.get_session()

        auth_url = unscoped_auth.auth_url

        system_auth = None
        system_auth_ref = None
        token = unscoped_auth_ref.auth_token
        system_auth = utils.get_token_auth_plugin(
            auth_url,
            token,
            system_scope=system_scope)
        system_auth_ref = system_auth.get_access(session)
        return system_auth, system_auth_ref