File: slugs.py

package info (click to toggle)
python-pykmip 0.10.0-10
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 7,796 kB
  • sloc: python: 102,456; makefile: 33; sh: 12
file content (108 lines) | stat: -rw-r--r-- 3,890 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# Copyright (c) 2018 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import requests
import six

from kmip.core import exceptions
from kmip.services.server.auth import api
from kmip.services.server.auth import utils


class SLUGSConnector(api.AuthAPI):
    """
    An authentication API connector for a SLUGS service.
    """

    def __init__(self, url=None):
        """
        Construct a SLUGSConnector.

        Args:
            url (string): The base URL for the remote SLUGS instance. Optional,
                defaults to None. Required for authentication.
        """
        self._url = None
        self.users_url = None
        self.groups_url = None

        self.url = url

    @property
    def url(self):
        return self._url

    @url.setter
    def url(self, value):
        if value is None:
            self._url = None
            self.users_url = None
            self.groups_url = None
        elif isinstance(value, six.string_types):
            self._url = value
            if not self._url.endswith("/"):
                self._url += "/"
            self.users_url = self._url + "users/{}"
            self.groups_url = self.users_url + "/groups"
        else:
            raise TypeError("URL must be a string.")

    def authenticate(self,
                     connection_certificate=None,
                     connection_info=None,
                     request_credentials=None):
        """
        Query the configured SLUGS service with the provided credentials.

        Args:
            connection_certificate (cryptography.x509.Certificate): An X.509
                certificate object obtained from the connection being
                authenticated. Required for SLUGS authentication.
            connection_info (tuple): A tuple of information pertaining to the
                connection being authenticated, including the source IP address
                and a timestamp (e.g., ('127.0.0.1', 1519759267.467451)).
                Optional, defaults to None. Ignored for SLUGS authentication.
            request_credentials (list): A list of KMIP Credential structures
                containing credential information to use for authentication.
                Optional, defaults to None. Ignored for SLUGS authentication.
        """
        if (self.users_url is None) or (self.groups_url is None):
            raise exceptions.ConfigurationError(
                "The SLUGS URL must be specified."
            )

        user_id = utils.get_client_identity_from_certificate(
            connection_certificate
        )

        try:
            response = requests.get(self.users_url.format(user_id))
        except Exception:
            raise exceptions.ConfigurationError(
                "A connection could not be established using the SLUGS URL."
            )
        if response.status_code == 404:
            raise exceptions.PermissionDenied(
                "Unrecognized user ID: {}".format(user_id)
            )

        response = requests.get(self.groups_url.format(user_id))
        if response.status_code == 404:
            raise exceptions.PermissionDenied(
                "Group information could not be retrieved for user ID: "
                "{}".format(user_id)
            )

        return user_id, response.json().get('groups')