1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
|
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helpers for Agent Identity credentials."""
import base64
import hashlib
import logging
import os
import re
import time
from urllib.parse import quote, urlparse
from google.auth import environment_vars
from google.auth import exceptions
_LOGGER = logging.getLogger(__name__)
CRYPTOGRAPHY_NOT_FOUND_ERROR = (
"The cryptography library is required for certificate-based authentication."
"Please install it with `pip install google-auth[cryptography]`."
)
# SPIFFE trust domain patterns for Agent Identities.
_AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS = [
r"^agents\.global\.org-\d+\.system\.id\.goog$",
r"^agents\.global\.proj-\d+\.system\.id\.goog$",
]
_WELL_KNOWN_CERT_PATH = "/var/run/secrets/workload-spiffe-credentials/certificates.pem"
# Constants for polling the certificate file.
_FAST_POLL_CYCLES = 50
_FAST_POLL_INTERVAL = 0.1 # 100ms
_SLOW_POLL_INTERVAL = 0.5 # 500ms
_TOTAL_TIMEOUT = 30 # seconds
# Calculate the number of slow poll cycles based on the total timeout.
_SLOW_POLL_CYCLES = int(
(_TOTAL_TIMEOUT - (_FAST_POLL_CYCLES * _FAST_POLL_INTERVAL)) / _SLOW_POLL_INTERVAL
)
_POLLING_INTERVALS = ([_FAST_POLL_INTERVAL] * _FAST_POLL_CYCLES) + (
[_SLOW_POLL_INTERVAL] * _SLOW_POLL_CYCLES
)
def _is_certificate_file_ready(path):
"""Checks if a file exists and is not empty."""
return path and os.path.exists(path) and os.path.getsize(path) > 0
def get_agent_identity_certificate_path():
"""Gets the certificate path from the certificate config file.
The path to the certificate config file is read from the
GOOGLE_API_CERTIFICATE_CONFIG environment variable. This function
implements a retry mechanism to handle cases where the environment
variable is set before the files are available on the filesystem.
Returns:
str: The path to the leaf certificate file.
Raises:
google.auth.exceptions.RefreshError: If the certificate config file
or the certificate file cannot be found after retries.
"""
import json
cert_config_path = os.environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
if not cert_config_path:
return None
has_logged_warning = False
for interval in _POLLING_INTERVALS:
try:
with open(cert_config_path, "r") as f:
cert_config = json.load(f)
cert_path = (
cert_config.get("cert_configs", {})
.get("workload", {})
.get("cert_path")
)
if _is_certificate_file_ready(cert_path):
return cert_path
except (IOError, ValueError, KeyError):
if not has_logged_warning:
_LOGGER.warning(
"Certificate config file not found at %s (from %s environment "
"variable). Retrying for up to %s seconds.",
cert_config_path,
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG,
_TOTAL_TIMEOUT,
)
has_logged_warning = True
pass
# As a fallback, check the well-known certificate path.
if _is_certificate_file_ready(_WELL_KNOWN_CERT_PATH):
return _WELL_KNOWN_CERT_PATH
# A sleep is required in two cases:
# 1. The config file is not found (the except block).
# 2. The config file is found, but the certificate is not yet available.
# In both cases, we need to poll, so we sleep on every iteration
# that doesn't return a certificate.
time.sleep(interval)
raise exceptions.RefreshError(
"Certificate config or certificate file not found after multiple retries. "
f"Token binding protection is failing. You can turn off this protection by setting "
f"{environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES} to false "
"to fall back to unbound tokens."
)
def get_and_parse_agent_identity_certificate():
"""Gets and parses the agent identity certificate if not opted out.
Checks if the user has opted out of certificate-bound tokens. If not,
it gets the certificate path, reads the file, and parses it.
Returns:
The parsed certificate object if found and not opted out, otherwise None.
"""
# If the user has opted out of cert bound tokens, there is no need to
# look up the certificate.
is_opted_out = (
os.environ.get(
environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
"true",
).lower()
== "false"
)
if is_opted_out:
return None
cert_path = get_agent_identity_certificate_path()
if not cert_path:
return None
with open(cert_path, "rb") as cert_file:
cert_bytes = cert_file.read()
return parse_certificate(cert_bytes)
def parse_certificate(cert_bytes):
"""Parses a PEM-encoded certificate.
Args:
cert_bytes (bytes): The PEM-encoded certificate bytes.
Returns:
cryptography.x509.Certificate: The parsed certificate object.
"""
try:
from cryptography import x509
return x509.load_pem_x509_certificate(cert_bytes)
except ImportError as e:
raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
def _is_agent_identity_certificate(cert):
"""Checks if a certificate is an Agent Identity certificate.
This is determined by checking the Subject Alternative Name (SAN) for a
SPIFFE ID with a trust domain matching Agent Identity patterns.
Args:
cert (cryptography.x509.Certificate): The parsed certificate object.
Returns:
bool: True if the certificate is an Agent Identity certificate,
False otherwise.
"""
try:
from cryptography import x509
from cryptography.x509.oid import ExtensionOID
try:
ext = cert.extensions.get_extension_for_oid(
ExtensionOID.SUBJECT_ALTERNATIVE_NAME
)
except x509.ExtensionNotFound:
return False
uris = ext.value.get_values_for_type(x509.UniformResourceIdentifier)
for uri in uris:
parsed_uri = urlparse(uri)
if parsed_uri.scheme == "spiffe":
trust_domain = parsed_uri.netloc
for pattern in _AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS:
if re.match(pattern, trust_domain):
return True
return False
except ImportError as e:
raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
def calculate_certificate_fingerprint(cert):
"""Calculates the URL-encoded, unpadded, base64-encoded SHA256 hash of a
DER-encoded certificate.
Args:
cert (cryptography.x509.Certificate): The parsed certificate object.
Returns:
str: The URL-encoded, unpadded, base64-encoded SHA256 fingerprint.
"""
try:
from cryptography.hazmat.primitives import serialization
der_cert = cert.public_bytes(serialization.Encoding.DER)
fingerprint = hashlib.sha256(der_cert).digest()
# The certificate fingerprint is generated in two steps to align with GFE's
# expectations and ensure proper URL transmission:
# 1. Standard base64 encoding is applied, and padding ('=') is removed.
# 2. The resulting string is then URL-encoded to handle special characters
# ('+', '/') that would otherwise be misinterpreted in URL parameters.
base64_fingerprint = base64.b64encode(fingerprint).decode("utf-8")
unpadded_base64_fingerprint = base64_fingerprint.rstrip("=")
return quote(unpadded_base64_fingerprint)
except ImportError as e:
raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e
def should_request_bound_token(cert):
"""Determines if a bound token should be requested.
This is based on the GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES
environment variable and whether the certificate is an agent identity cert.
Args:
cert (cryptography.x509.Certificate): The parsed certificate object.
Returns:
bool: True if a bound token should be requested, False otherwise.
"""
is_agent_cert = _is_agent_identity_certificate(cert)
is_opted_in = (
os.environ.get(
environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
"true",
).lower()
== "true"
)
return is_agent_cert and is_opted_in
def get_cached_cert_fingerprint(cached_cert):
"""Returns the fingerprint of the cached certificate."""
if cached_cert:
cert_obj = parse_certificate(cached_cert)
cached_cert_fingerprint = calculate_certificate_fingerprint(cert_obj)
else:
raise ValueError("mTLS connection is not configured.")
return cached_cert_fingerprint
|