File: key_vault_client.py

package info (click to toggle)
python-azure 20181112%2Bgit-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 407,300 kB
  • sloc: python: 717,190; makefile: 201; sh: 76
file content (255 lines) | stat: -rw-r--r-- 10,797 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is
# regenerated.
# --------------------------------------------------------------------------
import uuid
from msrest.pipeline import ClientRawResponse
from msrestazure import AzureConfiguration
from msrestazure.azure_active_directory import BasicTokenAuthentication

from azure.profiles import KnownProfiles, ProfileDefinition
from azure.profiles.multiapiclient import MultiApiClientMixin
from .version import VERSION
from . import KeyVaultAuthentication

from azure.keyvault.v7_0.version import VERSION as v7_0_VERSION
from azure.keyvault.v2016_10_01.version import VERSION as v2016_10_01_VERSION


class KeyVaultClientConfiguration(AzureConfiguration):
    """Configuration for KeyVaultClient
    Note that all parameters used to create this instance are saved as instance
    attributes.

    :param credentials: Credentials needed for the client to connect to Azure.
    :type credentials: :mod:`A msrestazure Credentials
     object<msrestazure.azure_active_directory>`
    """

    def __init__(
            self, credentials):

        if credentials is None:
            raise ValueError("Parameter 'credentials' must not be None.")
        base_url = '{vaultBaseUrl}'

        super(KeyVaultClientConfiguration, self).__init__(base_url)

        self.add_user_agent('azure-keyvault/{}'.format(VERSION))
        self.add_user_agent('Azure-SDK-For-Python')

        self.credentials = credentials


class KeyVaultClient(MultiApiClientMixin):
    """The key vault client performs cryptographic key operations and vault operations against the Key Vault service.
    Implementation depends on the API version:

         * 2016-10-01: :class:`v2016_10_01.KeyVaultClient<azure.keyvault.v2016_10_01.KeyVaultClient>`
         * 7.0: :class:`v7_0.KeyVaultClient<azure.mgmt.keyvault.v7_0.KeyVaultClient>`

    :ivar config: Configuration for client.
    :vartype config: KeyVaultClientConfiguration

    :param credentials: Credentials needed for the client to connect to Azure.
    :type credentials: :mod:`A msrestazure Credentials
     object<msrestazure.azure_active_directory>`

    :param str api_version: API version to use if no profile is provided, or if
     missing in profile.
    :param profile: A profile definition, from KnownProfiles to dict.
    :type profile: azure.profiles.KnownProfiles
    """

    DEFAULT_API_VERSION = '7.0'
    _PROFILE_TAG = "azure.keyvault.KeyVaultClient"
    LATEST_PROFILE = ProfileDefinition({
        _PROFILE_TAG: {
            None: DEFAULT_API_VERSION
        }},
        _PROFILE_TAG + " latest"
    )

    _init_complete = False
    def __init__(self, credentials, api_version=None, profile=KnownProfiles.default):
        self.config = KeyVaultClientConfiguration(credentials)
        self._client_impls = {}
        self._entered = False
        super(KeyVaultClient, self).__init__(
            api_version=api_version,
            profile=profile
        )

        # if the supplied credentials instance is not derived from KeyVaultAuthBase but is an AAD credential type
        if not isinstance(credentials, KeyVaultAuthentication) and isinstance(credentials, BasicTokenAuthentication):

            # wrap the supplied credentials with a KeyVaultAuthentication instance. Use that for the credentials
            # supplied to the base client
            credentials = KeyVaultAuthentication(credentials=credentials)

        self._credentials = credentials
        self._init_complete = True

    @property
    def models(self):
        """Module depends on the API version:
            * 2016-10-01: :mod:`v2016_10_01.models<azure.keyvault.v2016_10_01.models>`
            * 7.0: :mod:`v7_0.models<azure.keyvault.v7_0.models>`
         """
        api_version = self._get_api_version(None)

        if api_version == v7_0_VERSION:
            from azure.keyvault.v7_0 import models as implModels
        elif api_version == v2016_10_01_VERSION:
            from azure.keyvault.v2016_10_01 import models as implModels
        else:
            raise NotImplementedError("APIVersion {} is not available".format(api_version))
        return implModels

    def _get_client_impl(self):
        """
        Get the versioned client implementation corresponding to the current profile.
        :return:  The versioned client implementation.
        """
        api_version = self._get_api_version(None)
        if api_version not in self._client_impls:
            self._create_client_impl(api_version)
        return self._client_impls[api_version]

    def _create_client_impl(self, api_version):
        """
        Creates the client implementation corresponding to the specifeid api_version.
        :param api_version:
        :return:
        """
        if api_version == v7_0_VERSION:
            from azure.keyvault.v7_0 import KeyVaultClient as ImplClient
        elif api_version == v2016_10_01_VERSION:
            from azure.keyvault.v2016_10_01 import KeyVaultClient as ImplClient
        else:
            raise NotImplementedError("APIVersion {} is not available".format(api_version))

        impl = ImplClient(credentials=self._credentials)
        impl.config = self.config

        # if __enter__ has previously been called and the impl client has __enter__ defined we need to call it
        if self._entered and hasattr(impl, '__enter__'):
            impl.__enter__()

        self._client_impls[api_version] = impl
        return impl

    def __enter__(self, *args, **kwargs):
        """
        Calls __enter__ on all client implementations which support it
        :param args: positional arguments to relay to client implementations of __enter__
        :param kwargs: keyword arguments to relay to client implementations of __enter__
        :return: returns the current KeyVaultClient instance
        """
        for _, impl in self._client_impls.items():
            if hasattr(impl, '__enter__'):
                impl.__enter__(*args, **kwargs)
        # mark the current KeyVaultClient as _entered so that client implementations instantiated
        # subsequently will also have __enter__ called on them as appropriate
        self._entered = True
        return self

    def __exit__(self, *args, **kwargs):
        """
        Calls __exit__ on all client implementations which support it
        :param args: positional arguments to relay to client implementations of __enter__
        :param kwargs: keyword arguments to relay to client implementations of __enter__
        :return: returns the current KeyVaultClient instance
        """
        for _, impl in self._client_impls.items():
            if hasattr(impl, '__exit__'):
                impl.__exit__(*args, **kwargs)
        return self

    def __getattr__(self, name):
        """
        In the case that the attribute is not defined on the custom KeyVaultClient.  Attempt to get
        the attribute from the versioned client implementation corresponding to the current profile.
        :param name: Name of the attribute retrieve from the current versioned client implementation
        :return: The value of the specified attribute on the current client implementation.
        """
        impl = self._get_client_impl()
        return getattr(impl, name)

    def __setattr__(self, name, attr):
        """
        Sets the specified attribute either on the custom KeyVaultClient or the current underlying implementation.
        :param name: Name of the attribute to set
        :param attr: Value of the attribute to set
        :return: None
        """
        if self._init_complete and not hasattr(self, name):
            impl = self._get_client_impl()
            setattr(impl, name, attr)
        else:
            super(KeyVaultClient, self).__setattr__(name, attr)

    def get_pending_certificate_signing_request(self, vault_base_url, certificate_name, custom_headers=None, raw=False, **operation_config):
        """Gets the Base64 pending certificate signing request (PKCS-10).

        :param vault_base_url: The vault name, e.g.
         https://myvault.vault.azure.net
        :type vault_base_url: str
        :param certificate_name: The name of the certificate
        :type certificate_name: str
        :param dict custom_headers: headers that will be added to the request
        :param bool raw: returns the direct response alongside the
         deserialized response
        :param operation_config: :ref:`Operation configuration
         overrides<msrest:optionsforoperations>`.
        :return: Base64 encoded pending certificate signing request (PKCS-10).
        :rtype: str
        :rtype: :class:`ClientRawResponse<msrest.pipeline.ClientRawResponse>`
         if raw=true
        """
        # Construct URL
        url = '/certificates/{certificate-name}/pending'
        path_format_arguments = {
            'vaultBaseUrl': self._serialize.url("vault_base_url", vault_base_url, 'str', skip_quote=True),
            'certificate-name': self._serialize.url("certificate_name", certificate_name, 'str')
        }
        url = self._client.format_url(url, **path_format_arguments)

        # Construct parameters
        query_parameters = {}
        query_parameters['api-version'] = self._serialize.query("self.api_version", self.api_version, 'str')

        # Construct headers
        header_parameters = {}
        header_parameters['Accept'] = 'application/pkcs10'
        if self.config.generate_client_request_id:
            header_parameters['x-ms-client-request-id'] = str(uuid.uuid1())
        if custom_headers:
            header_parameters.update(custom_headers)
        if self.config.accept_language is not None:
            header_parameters['accept-language'] = self._serialize.header("self.config.accept_language", self.config.accept_language, 'str')

        # Construct and send request
        request = self._client.get(url, query_parameters, header_parameters)
        response = self._client.send(request, stream=True, **operation_config)

        if response.status_code not in [200]:
            raise self.models.KeyVaultErrorException(self._deserialize, response)

        deserialized = None

        if response.status_code == 200:
            deserialized = response.body() if hasattr(response, 'body') else response.content

        if raw:
            client_raw_response = ClientRawResponse(deserialized, response)
            return client_raw_response

        return deserialized