File: plugin.py

package info (click to toggle)
python-keystoneauth1 5.12.0-2
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 2,280 kB
  • sloc: python: 23,506; xml: 285; makefile: 93; sh: 2
file content (465 lines) | stat: -rw-r--r-- 17,620 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import typing as ty

import typing_extensions as ty_ext

from keystoneauth1 import discover

if ty.TYPE_CHECKING:
    from keystoneauth1.access import access
    from keystoneauth1 import session as ks_session

# NOTE(jamielennox): The AUTH_INTERFACE is a special value that can be
# requested from get_endpoint. If a plugin receives this as the value of
# 'interface' it should return the initial URL that was passed to the plugin.
AUTH_INTERFACE = object()

IDENTITY_AUTH_HEADER_NAME = 'X-Auth-Token'


BaseAuthPluginT = ty.TypeVar(
    'BaseAuthPluginT', bound='BaseAuthPlugin', covariant=True
)


class ConnectionParams(ty.TypedDict):
    # https://github.com/python/typeshed/blob/24c78b9e0/stubs/requests/requests/sessions.pyi#L82
    cert: ty_ext.NotRequired[str | tuple[str, str] | None]
    # https://github.com/python/typeshed/blob/24c78b9e0/stubs/requests/requests/sessions.pyi#L108
    verify: ty_ext.NotRequired[bool | str | None]


class BaseAuthPlugin:
    """The basic structure of an authentication plugin.

    .. note::
        See :doc:`/authentication-plugins` for a description of plugins
        provided by this library.

    """

    def __init__(self) -> None:
        self._discovery_cache: dict[str, discover.Discover] = {}

    def get_token(self, session: 'ks_session.Session') -> str | None:
        """Obtain a token.

        How the token is obtained is up to the plugin. If it is still valid
        it may be re-used, retrieved from cache or invoke an authentication
        request against a server.

        Returning None will indicate that no token was able to be retrieved.

        This function is misplaced as it should only be required for auth
        plugins that use the 'X-Auth-Token' header. However due to the way
        plugins evolved this method is required and often called to trigger an
        authentication request on a new plugin.

        When implementing a new plugin it is advised that you implement this
        method, however if you don't require the 'X-Auth-Token' header override
        the `get_headers` method instead.

        :param session: A session object so the plugin can make HTTP calls.
        :type session: keystoneauth1.session.Session

        :return: A token to use.
        :rtype: string
        """
        return None

    def get_auth_ref(
        self, session: 'ks_session.Session'
    ) -> ty.Optional['access.AccessInfo']:
        """Return the authentication reference of an auth plugin.

        :param session: A session object to be used for communication
        :type session: keystoneauth1.session.session

        :return: An access info object if supported.
        :rtype: `keystoneauth1.access.AccessInfo` or None
        """
        return None

    def get_headers(
        self, session: 'ks_session.Session'
    ) -> dict[str, str] | None:
        """Fetch authentication headers for message.

        This is a more generalized replacement of the older get_token to allow
        plugins to specify different or additional authentication headers to
        the OpenStack standard 'X-Auth-Token' header.

        How the authentication headers are obtained is up to the plugin. If the
        headers are still valid they may be re-used, retrieved from cache or
        the plugin may invoke an authentication request against a server.

        The default implementation of get_headers calls the `get_token` method
        to enable older style plugins to continue functioning unchanged.
        Subclasses should feel free to completely override this function to
        provide the headers that they want.

        Returning None will indicate that no token was able to be retrieved and
        that authorization was a failure. Adding no authentication data can be
        achieved by returning an empty dictionary.

        :param session: The session object that the auth_plugin belongs to.
        :type session: keystoneauth1.session.Session

        :returns: Headers that are set to authenticate a message or None for
                  failure. Note that when checking this value that the empty
                  dict is a valid, non-failure response.
        :rtype: dict
        """
        token = self.get_token(session)

        if not token:
            return None

        return {IDENTITY_AUTH_HEADER_NAME: token}

    def get_endpoint_data(
        self,
        session: 'ks_session.Session',
        *,
        endpoint_override: str | None = None,
        discover_versions: bool = True,
        **kwargs: ty.Any,
    ) -> discover.EndpointData | None:
        """Return a valid endpoint data for a service.

        :param session: A session object that can be used for communication.
        :type session: keystoneauth1.session.Session
        :param str endpoint_override: URL to use for version discovery.
        :param bool discover_versions: Whether to get version metadata from
                                       the version discovery document even
                                       if it major api version info can be
                                       inferred from the url.
                                       (optional, defaults to True)
        :param kwargs: Ignored.

        :raises keystoneauth1.exceptions.http.HttpError: An error from an
                                                         invalid HTTP response.

        :return: Valid EndpointData or None if not available.
        :rtype: `keystoneauth1.discover.EndpointData` or None
        """
        if not endpoint_override:
            return None
        endpoint_data = discover.EndpointData(catalog_url=endpoint_override)

        if endpoint_data.api_version and not discover_versions:
            return endpoint_data

        return endpoint_data.get_versioned_data(
            session,
            cache=self._discovery_cache,
            discover_versions=discover_versions,
        )

    def get_api_major_version(
        self,
        session: 'ks_session.Session',
        *,
        endpoint_override: str | None = None,
        **kwargs: ty.Any,
    ) -> tuple[int | float, ...] | None:
        """Get the major API version from the endpoint.

        :param session: A session object that can be used for communication.
        :type session: keystoneauth1.session.Session
        :param str endpoint_override: URL to use for version discovery.
        :param kwargs: Ignored.

        :raises keystoneauth1.exceptions.http.HttpError: An error from an
                                                         invalid HTTP response.

        :return: Valid EndpointData or None if not available.
        :rtype: `keystoneauth1.discover.EndpointData` or None
        """
        endpoint_data = self.get_endpoint_data(
            session,
            endpoint_override=endpoint_override,
            discover_versions=False,
            **kwargs,
        )
        if endpoint_data is None:
            return None

        if endpoint_data.api_version is None:
            # No version detected from the URL, trying full discovery.
            endpoint_data = self.get_endpoint_data(
                session,
                endpoint_override=endpoint_override,
                discover_versions=True,
                **kwargs,
            )

        if endpoint_data and endpoint_data.api_version:
            return endpoint_data.api_version

        return None

    def get_all_version_data(
        self,
        session: 'ks_session.Session',
        interface: str | list[str] | None = 'public',
        region_name: str | None = None,
        service_type: str | None = None,
    ) -> dict[str, dict[str, dict[str, list[discover.VersionData]]]]:
        """Get version data for all services in the catalog.

        :param session: A session object that can be used for communication.
        :type session: keystoneauth1.session.Session
        :param interface:
            Type of endpoint to get version data for. Can be a single value
            or a list of values. A value of None indicates that all interfaces
            should be queried. (optional, defaults to public)
        :param string region_name:
            Region of endpoints to get version data for. A valueof None
            indicates that all regions should be queried. (optional, defaults
            to None)
        :param string service_type:
            Limit the version data to a single service. (optional, defaults
            to None)

        :returns: A dictionary keyed by region_name with values containing
            dictionaries keyed by interface with values being a list of
            :class:`~keystoneauth1.discover.VersionData`.
        """
        raise NotImplementedError()

    def get_endpoint(
        self, session: 'ks_session.Session', **kwargs: ty.Any
    ) -> str | None:
        """Return an endpoint for the client.

        There are no required keyword arguments to ``get_endpoint`` as a plugin
        implementation should use best effort with the information available to
        determine the endpoint. However there are certain standard options that
        will be generated by the clients and should be used by plugins:

        - ``service_type``: what sort of service is required.
        - ``service_name``: the name of the service in the catalog.
        - ``interface``: what visibility the endpoint should have.
        - ``region_name``: the region the endpoint exists in.

        :param session: The session object that the auth_plugin belongs to.
        :type session: keystoneauth1.session.Session
        :param kwargs: Ignored.

        :returns: The base URL that will be used to talk to the required
                  service or None if not available.
        :rtype: string
        """
        endpoint_data = self.get_endpoint_data(
            session, discover_versions=False, **kwargs
        )
        if not endpoint_data:
            return None
        return endpoint_data.url

    def get_connection_params(
        self, session: 'ks_session.Session'
    ) -> ConnectionParams:
        """Return any additional connection parameters required for the plugin.

        :param session: The session object that the auth_plugin belongs to.
        :type session: keystoneauth1.session.Session

        :returns: Parameters that are passed to the requests library. Only the
                  ``cert`` and ``verify`` parameters may be returned.
        :rtype: dict
        """
        return {}

    def invalidate(self) -> bool:
        """Invalidate the current authentication data.

        This should result in fetching a new token on next call.

        A plugin may be invalidated if an Unauthorized HTTP response is
        returned to indicate that the token may have been revoked or is
        otherwise now invalid.

        :returns: True if there was something that the plugin did to
                  invalidate. This means that it makes sense to try again. If
                  nothing happens returns False to indicate give up.
        :rtype: bool
        """
        return False

    def get_user_id(self, session: 'ks_session.Session') -> str | None:
        """Return a unique user identifier of the plugin.

        Wherever possible the user id should be inferred from the token however
        there are certain URLs and other places that require access to the
        currently authenticated user id.

        :param session: A session object so the plugin can make HTTP calls.
        :type session: keystoneauth1.session.Session

        :returns: A user identifier or None if one is not available.
        :rtype: str
        """
        return None

    def get_project_id(self, session: 'ks_session.Session') -> str | None:
        """Return the project id that we are authenticated to.

        Wherever possible the project id should be inferred from the token
        however there are certain URLs and other places that require access to
        the currently authenticated project id.

        :param session: A session object so the plugin can make HTTP calls.
        :type session: keystoneauth1.session.Session

        :returns: A project identifier or None if one is not available.
        :rtype: str
        """
        return None

    def get_sp_auth_url(
        self, session: 'ks_session.Session', sp_id: str
    ) -> str | None:
        """Return auth_url from the Service Provider object.

        This url is used for obtaining unscoped federated token from remote
        cloud.

        :param sp_id: ID of the Service Provider to be queried.
        :type sp_id: string

        :returns: A Service Provider auth_url or None if one is not available.
        :rtype: str

        """
        return None

    def get_sp_url(
        self, session: 'ks_session.Session', sp_id: str
    ) -> str | None:
        """Return sp_url from the Service Provider object.

        This url is used for passing SAML2 assertion to the remote cloud.

        :param sp_id: ID of the Service Provider to be queried.
        :type sp_id: str

        :returns: A Service Provider sp_url or None if one is not available.
        :rtype: str

        """
        return None

    def get_cache_id(self) -> str | None:
        """Fetch an identifier that uniquely identifies the auth options.

        The returned identifier need not be decomposable or otherwise provide
        anyway to recreate the plugin. It should not contain sensitive data in
        plaintext.

        This string MUST change if any of the parameters that are used to
        uniquely identity this plugin change.

        If get_cache_id returns a str value suggesting that caching is
        supported then get_auth_cache and set_auth_cache must also be
        implemented.

        :returns: A unique string for the set of options
        :rtype: str or None if this is unsupported or unavailable.
        """
        return None

    def get_auth_state(self) -> object:
        """Retrieve the current authentication state for the plugin.

        Retrieve any internal state that represents the authenticated plugin.

        This should not fetch any new data if it is not present.

        :raises NotImplementedError: if the plugin does not support this
            feature.

        :returns: raw python data (which can be JSON serialized) that can be
                  moved into another plugin (of the same type) to have the
                  same authenticated state.
        :rtype: object or None if unauthenticated.
        """
        raise NotImplementedError()

    def set_auth_state(self, data: str) -> None:
        """Install existing authentication state for a plugin.

        Take the output of get_auth_state and install that authentication state
        into the current authentication plugin.

        :raises NotImplementedError: if the plugin does not support this
            feature.
        """
        raise NotImplementedError()


class FixedEndpointPlugin(BaseAuthPlugin):
    """A base class for plugins that have one fixed endpoint."""

    def __init__(self, endpoint: str | None = None):
        super().__init__()
        self.endpoint = endpoint

    def get_endpoint(
        self,
        session: 'ks_session.Session',
        endpoint_override: str | None = None,
        **kwargs: ty.Any,
    ) -> str | None:
        """Return the supplied endpoint.

        Using this plugin the same endpoint is returned regardless of the
        parameters passed to the plugin. endpoint_override overrides the
        endpoint specified when constructing the plugin.
        """
        return endpoint_override or self.endpoint

    def get_endpoint_data(
        self,
        session: 'ks_session.Session',
        *,
        endpoint_override: str | None = None,
        discover_versions: bool = True,
        **kwargs: ty.Any,
    ) -> discover.EndpointData | None:
        """Return a valid endpoint data for a the service.

        :param session: A session object that can be used for communication.
        :type session: keystoneauth1.session.Session
        :param str endpoint_override: URL to use for version discovery.
        :param bool discover_versions: Whether to get version metadata from
                                       the version discovery document even
                                       if it major api version info can be
                                       inferred from the url.
                                       (optional, defaults to True)
        :param kwargs: Ignored.

        :raises keystoneauth1.exceptions.http.HttpError: An error from an
                                                         invalid HTTP response.

        :return: Valid EndpointData or None if not available.
        :rtype: `keystoneauth1.discover.EndpointData` or None
        """
        return super().get_endpoint_data(
            session,
            endpoint_override=endpoint_override or self.endpoint,
            discover_versions=discover_versions,
            **kwargs,
        )