1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
|
# Copyright 2016 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Authorization support for gRPC."""
from __future__ import absolute_import
import logging
import os
from google.auth import environment_vars
from google.auth import exceptions
from google.auth.transport import _mtls_helper
from google.oauth2 import service_account
try:
import grpc # type: ignore
except ImportError as caught_exc: # pragma: NO COVER
raise ImportError(
"gRPC is not installed from please install the grpcio package to use the gRPC transport."
) from caught_exc
_LOGGER = logging.getLogger(__name__)
class AuthMetadataPlugin(grpc.AuthMetadataPlugin):
"""A `gRPC AuthMetadataPlugin`_ that inserts the credentials into each
request.
.. _gRPC AuthMetadataPlugin:
http://www.grpc.io/grpc/python/grpc.html#grpc.AuthMetadataPlugin
Args:
credentials (google.auth.credentials.Credentials): The credentials to
add to requests.
request (google.auth.transport.Request): A HTTP transport request
object used to refresh credentials as needed.
default_host (Optional[str]): A host like "pubsub.googleapis.com".
This is used when a self-signed JWT is created from service
account credentials.
"""
def __init__(self, credentials, request, default_host=None):
# pylint: disable=no-value-for-parameter
# pylint doesn't realize that the super method takes no arguments
# because this class is the same name as the superclass.
super(AuthMetadataPlugin, self).__init__()
self._credentials = credentials
self._request = request
self._default_host = default_host
def _get_authorization_headers(self, context):
"""Gets the authorization headers for a request.
Returns:
Sequence[Tuple[str, str]]: A list of request headers (key, value)
to add to the request.
"""
headers = {}
# https://google.aip.dev/auth/4111
# Attempt to use self-signed JWTs when a service account is used.
# A default host must be explicitly provided since it cannot always
# be determined from the context.service_url.
if isinstance(self._credentials, service_account.Credentials):
self._credentials._create_self_signed_jwt(
"https://{}/".format(self._default_host) if self._default_host else None
)
self._credentials.before_request(
self._request, context.method_name, context.service_url, headers
)
return list(headers.items())
def __call__(self, context, callback):
"""Passes authorization metadata into the given callback.
Args:
context (grpc.AuthMetadataContext): The RPC context.
callback (grpc.AuthMetadataPluginCallback): The callback that will
be invoked to pass in the authorization metadata.
"""
callback(self._get_authorization_headers(context), None)
def secure_authorized_channel(
credentials,
request,
target,
ssl_credentials=None,
client_cert_callback=None,
**kwargs
):
"""Creates a secure authorized gRPC channel.
This creates a channel with SSL and :class:`AuthMetadataPlugin`. This
channel can be used to create a stub that can make authorized requests.
Users can configure client certificate or rely on device certificates to
establish a mutual TLS channel, if the `GOOGLE_API_USE_CLIENT_CERTIFICATE`
variable is explicitly set to `true`.
Example::
import google.auth
import google.auth.transport.grpc
import google.auth.transport.requests
from google.cloud.speech.v1 import cloud_speech_pb2
# Get credentials.
credentials, _ = google.auth.default()
# Get an HTTP request function to refresh credentials.
request = google.auth.transport.requests.Request()
# Create a channel.
channel = google.auth.transport.grpc.secure_authorized_channel(
credentials, regular_endpoint, request,
ssl_credentials=grpc.ssl_channel_credentials())
# Use the channel to create a stub.
cloud_speech.create_Speech_stub(channel)
Usage:
There are actually a couple of options to create a channel, depending on if
you want to create a regular or mutual TLS channel.
First let's list the endpoints (regular vs mutual TLS) to choose from::
regular_endpoint = 'speech.googleapis.com:443'
mtls_endpoint = 'speech.mtls.googleapis.com:443'
Option 1: create a regular (non-mutual) TLS channel by explicitly setting
the ssl_credentials::
regular_ssl_credentials = grpc.ssl_channel_credentials()
channel = google.auth.transport.grpc.secure_authorized_channel(
credentials, regular_endpoint, request,
ssl_credentials=regular_ssl_credentials)
Option 2: create a mutual TLS channel by calling a callback which returns
the client side certificate and the key (Note that
`GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable must be explicitly
set to `true`)::
def my_client_cert_callback():
code_to_load_client_cert_and_key()
if loaded:
return (pem_cert_bytes, pem_key_bytes)
raise MyClientCertFailureException()
try:
channel = google.auth.transport.grpc.secure_authorized_channel(
credentials, mtls_endpoint, request,
client_cert_callback=my_client_cert_callback)
except MyClientCertFailureException:
# handle the exception
Option 3: use application default SSL credentials. It searches and uses
the command in a context aware metadata file, which is available on devices
with endpoint verification support (Note that
`GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable must be explicitly
set to `true`).
See https://cloud.google.com/endpoint-verification/docs/overview::
try:
default_ssl_credentials = SslCredentials()
except:
# Exception can be raised if the context aware metadata is malformed.
# See :class:`SslCredentials` for the possible exceptions.
# Choose the endpoint based on the SSL credentials type.
if default_ssl_credentials.is_mtls:
endpoint_to_use = mtls_endpoint
else:
endpoint_to_use = regular_endpoint
channel = google.auth.transport.grpc.secure_authorized_channel(
credentials, endpoint_to_use, request,
ssl_credentials=default_ssl_credentials)
Option 4: not setting ssl_credentials and client_cert_callback. For devices
without endpoint verification support or `GOOGLE_API_USE_CLIENT_CERTIFICATE`
environment variable is not `true`, a regular TLS channel is created;
otherwise, a mutual TLS channel is created, however, the call should be
wrapped in a try/except block in case of malformed context aware metadata.
The following code uses regular_endpoint, it works the same no matter the
created channle is regular or mutual TLS. Regular endpoint ignores client
certificate and key::
channel = google.auth.transport.grpc.secure_authorized_channel(
credentials, regular_endpoint, request)
The following code uses mtls_endpoint, if the created channle is regular,
and API mtls_endpoint is confgured to require client SSL credentials, API
calls using this channel will be rejected::
channel = google.auth.transport.grpc.secure_authorized_channel(
credentials, mtls_endpoint, request)
Args:
credentials (google.auth.credentials.Credentials): The credentials to
add to requests.
request (google.auth.transport.Request): A HTTP transport request
object used to refresh credentials as needed. Even though gRPC
is a separate transport, there's no way to refresh the credentials
without using a standard http transport.
target (str): The host and port of the service.
ssl_credentials (grpc.ChannelCredentials): Optional SSL channel
credentials. This can be used to specify different certificates.
This argument is mutually exclusive with client_cert_callback;
providing both will raise an exception.
If ssl_credentials and client_cert_callback are None, application
default SSL credentials are used if `GOOGLE_API_USE_CLIENT_CERTIFICATE`
environment variable is explicitly set to `true`, otherwise one way TLS
SSL credentials are used.
client_cert_callback (Callable[[], (bytes, bytes)]): Optional
callback function to obtain client certicate and key for mutual TLS
connection. This argument is mutually exclusive with
ssl_credentials; providing both will raise an exception.
This argument does nothing unless `GOOGLE_API_USE_CLIENT_CERTIFICATE`
environment variable is explicitly set to `true`.
kwargs: Additional arguments to pass to :func:`grpc.secure_channel`.
Returns:
grpc.Channel: The created gRPC channel.
Raises:
google.auth.exceptions.MutualTLSChannelError: If mutual TLS channel
creation failed for any reason.
"""
# Create the metadata plugin for inserting the authorization header.
metadata_plugin = AuthMetadataPlugin(credentials, request)
# Create a set of grpc.CallCredentials using the metadata plugin.
google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin)
if ssl_credentials and client_cert_callback:
raise exceptions.MalformedError(
"Received both ssl_credentials and client_cert_callback; "
"these are mutually exclusive."
)
# If SSL credentials are not explicitly set, try client_cert_callback and ADC.
if not ssl_credentials:
use_client_cert = os.getenv(
environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE, "false"
)
if use_client_cert == "true" and client_cert_callback:
# Use the callback if provided.
cert, key = client_cert_callback()
ssl_credentials = grpc.ssl_channel_credentials(
certificate_chain=cert, private_key=key
)
elif use_client_cert == "true":
# Use application default SSL credentials.
adc_ssl_credentils = SslCredentials()
ssl_credentials = adc_ssl_credentils.ssl_credentials
else:
ssl_credentials = grpc.ssl_channel_credentials()
# Combine the ssl credentials and the authorization credentials.
composite_credentials = grpc.composite_channel_credentials(
ssl_credentials, google_auth_credentials
)
return grpc.secure_channel(target, composite_credentials, **kwargs)
class SslCredentials:
"""Class for application default SSL credentials.
The behavior is controlled by `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment
variable whose default value is `false`. Client certificate will not be used
unless the environment variable is explicitly set to `true`. See
https://google.aip.dev/auth/4114
If the environment variable is `true`, then for devices with endpoint verification
support, a device certificate will be automatically loaded and mutual TLS will
be established.
See https://cloud.google.com/endpoint-verification/docs/overview.
"""
def __init__(self):
use_client_cert = os.getenv(
environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE, "false"
)
if use_client_cert != "true":
self._is_mtls = False
else:
# Load client SSL credentials.
metadata_path = _mtls_helper._check_config_path(
_mtls_helper.CONTEXT_AWARE_METADATA_PATH
)
self._is_mtls = metadata_path is not None
@property
def ssl_credentials(self):
"""Get the created SSL channel credentials.
For devices with endpoint verification support, if the device certificate
loading has any problems, corresponding exceptions will be raised. For
a device without endpoint verification support, no exceptions will be
raised.
Returns:
grpc.ChannelCredentials: The created grpc channel credentials.
Raises:
google.auth.exceptions.MutualTLSChannelError: If mutual TLS channel
creation failed for any reason.
"""
if self._is_mtls:
try:
_, cert, key, _ = _mtls_helper.get_client_ssl_credentials()
self._ssl_credentials = grpc.ssl_channel_credentials(
certificate_chain=cert, private_key=key
)
except exceptions.ClientCertError as caught_exc:
new_exc = exceptions.MutualTLSChannelError(caught_exc)
raise new_exc from caught_exc
else:
self._ssl_credentials = grpc.ssl_channel_credentials()
return self._ssl_credentials
@property
def is_mtls(self):
"""Indicates if the created SSL channel credentials is mutual TLS."""
return self._is_mtls
|