1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
|
"""DNS Authenticator for Google Cloud DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from typing import cast
import google.auth
from google.auth import exceptions as googleauth_exceptions
from googleapiclient import discovery
from googleapiclient import errors as googleapiclient_errors
from certbot import errors
from certbot.plugins import dns_common
logger = logging.getLogger(__name__)
ADC_URL = 'https://cloud.google.com/docs/authentication/application-default-credentials'
ACCT_URL = 'https://developers.google.com/identity/protocols/OAuth2ServiceAccount#creatinganaccount'
PERMISSIONS_URL = 'https://cloud.google.com/dns/access-control#permissions_and_roles'
METADATA_URL = 'http://metadata.google.internal/computeMetadata/v1/'
METADATA_HEADERS = {'Metadata-Flavor': 'Google'}
class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for Google Cloud DNS
This Authenticator uses the Google Cloud DNS API to fulfill a dns-01 challenge.
"""
description = ('Obtain certificates using a DNS TXT record (if you are using Google Cloud DNS '
'for DNS).')
ttl = 60
google_client = None
@classmethod
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 60) -> None:
super().add_parser_arguments(add, default_propagation_seconds=60)
add('credentials',
help=('Path to Google Cloud DNS service account JSON file to use instead of relying on'
' Application Default Credentials (ADC). (See {0} for information about ADC, {1}'
' for information about creating a service account, and {2} for information about'
' the permissions required to modify Cloud DNS records.)')
.format(ADC_URL, ACCT_URL, PERMISSIONS_URL),
default=None)
add('project',
help=('The ID of the Google Cloud project that the Google Cloud DNS managed zone(s)' +
' reside in. This will be determined automatically if not specified.'),
default=None)
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the Google Cloud DNS API.'
def _setup_credentials(self) -> None:
if self.conf('credentials') is not None:
self._configure_file('credentials',
'path to Google Cloud DNS service account JSON file')
dns_common.validate_file_permissions(self.conf('credentials'))
try:
self._get_google_client()
except googleauth_exceptions.DefaultCredentialsError as e:
raise errors.PluginError('Authentication using Google Application Default Credentials '
'failed ({}). Please configure credentials using'
' --dns-google-credentials <file>'.format(e))
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_google_client().add_txt_record(domain, validation_name, validation, self.ttl)
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_google_client().del_txt_record(domain, validation_name, validation, self.ttl)
def _get_google_client(self) -> '_GoogleClient':
if self.google_client is None:
self.google_client = _GoogleClient(self.conf('credentials'), self.conf('project'))
return self.google_client
class _GoogleClient:
"""
Encapsulates all communication with the Google Cloud DNS API.
"""
def __init__(self, account_json: Optional[str] = None,
dns_project_id: Optional[str] = None,
dns_api: Optional[discovery.Resource] = None) -> None:
scopes = ['https://www.googleapis.com/auth/ndev.clouddns.readwrite']
credentials = None
project_id = None
if account_json is not None:
try:
credentials, project_id = google.auth.load_credentials_from_file( # type: ignore [no-untyped-call, unused-ignore] # pylint: disable=line-too-long
account_json, scopes=scopes)
except googleauth_exceptions.GoogleAuthError as e:
raise errors.PluginError(
"Error loading credentials file '{}': {}".format(account_json, e))
else:
credentials, project_id = google.auth.default(scopes=scopes) # type: ignore [no-untyped-call, unused-ignore] # pylint: disable=line-too-long
if dns_project_id is not None:
project_id = dns_project_id
if not project_id:
raise errors.PluginError('The Google Cloud project could not be automatically '
'determined. Please configure it using --dns-google-project'
' <project>.')
self.project_id = project_id
if not dns_api:
self.dns = discovery.build('dns', 'v1',
credentials=credentials,
cache_discovery=False)
else:
self.dns = dns_api
def add_txt_record(self, domain: str, record_name: str, record_content: str,
record_ttl: int) -> None:
"""
Add a TXT record using the supplied information.
:param str domain: The domain to use to look up the managed zone.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
:param int record_ttl: The record TTL (number of seconds that the record may be cached).
:raises certbot.errors.PluginError: if an error occurs communicating with the Google API
"""
zone_id = self._find_managed_zone_id(domain)
record_contents = self.get_existing_txt_rrset(zone_id, record_name)
if record_contents is None:
# If it wasn't possible to fetch the records at this label (missing .list permission),
# assume there aren't any (#5678). If there are actually records here, this will fail
# with HTTP 409/412 API errors.
record_contents = {"rrdatas": []}
add_records = record_contents["rrdatas"][:]
if "\""+record_content+"\"" in record_contents["rrdatas"]:
# The process was interrupted previously and validation token exists
return
add_records.append(record_content)
data = {
"kind": "dns#change",
"additions": [
{
"kind": "dns#resourceRecordSet",
"type": "TXT",
"name": record_name + ".",
"rrdatas": add_records,
"ttl": record_ttl,
},
],
}
if record_contents["rrdatas"]:
# We need to remove old records in the same request
data["deletions"] = [
{
"kind": "dns#resourceRecordSet",
"type": "TXT",
"name": record_name + ".",
"rrdatas": record_contents["rrdatas"],
"ttl": record_contents["ttl"],
},
]
changes = self.dns.changes()
try:
request = changes.create(project=self.project_id, managedZone=zone_id, body=data)
response = request.execute()
status = response['status']
change = response['id']
while status == 'pending':
request = changes.get(project=self.project_id, managedZone=zone_id, changeId=change)
response = request.execute()
status = response['status']
except googleapiclient_errors.Error as e:
logger.error('Encountered error adding TXT record: %s', e)
raise errors.PluginError('Error communicating with the Google Cloud DNS API: {0}'
.format(e))
def del_txt_record(self, domain: str, record_name: str, record_content: str,
record_ttl: int) -> None:
"""
Delete a TXT record using the supplied information.
:param str domain: The domain to use to look up the managed zone.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
:param int record_ttl: The record TTL (number of seconds that the record may be cached).
:raises certbot.errors.PluginError: if an error occurs communicating with the Google API
"""
try:
zone_id = self._find_managed_zone_id(domain)
except errors.PluginError:
logger.warning('Error finding zone. Skipping cleanup.')
return
record_contents = self.get_existing_txt_rrset(zone_id, record_name)
if record_contents is None:
# If it wasn't possible to fetch the records at this label (missing .list permission),
# assume there aren't any (#5678). If there are actually records here, this will fail
# with HTTP 409/412 API errors.
record_contents = {"rrdatas": ["\"" + record_content + "\""], "ttl": record_ttl}
data = {
"kind": "dns#change",
"deletions": [
{
"kind": "dns#resourceRecordSet",
"type": "TXT",
"name": record_name + ".",
"rrdatas": record_contents["rrdatas"],
"ttl": record_contents["ttl"],
},
],
}
# Remove the record being deleted from the list
readd_contents = [r for r in record_contents["rrdatas"]
if r != "\"" + record_content + "\""]
if readd_contents:
# We need to remove old records in the same request
data["additions"] = [
{
"kind": "dns#resourceRecordSet",
"type": "TXT",
"name": record_name + ".",
"rrdatas": readd_contents,
"ttl": record_contents["ttl"],
},
]
changes = self.dns.changes()
try:
request = changes.create(project=self.project_id, managedZone=zone_id, body=data)
request.execute()
except googleapiclient_errors.Error as e:
logger.warning('Encountered error deleting TXT record: %s', e)
def get_existing_txt_rrset(self, zone_id: str, record_name: str) -> Optional[Dict[str, Any]]:
"""
Get existing TXT records from the RRset for the record name.
If an error occurs while requesting the record set, it is suppressed
and None is returned.
:param str zone_id: The ID of the managed zone.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:returns: The resourceRecordSet corresponding to `record_name` or None
:rtype: `resourceRecordSet <https://cloud.google.com/dns/docs/reference/v1/resourceRecordSets#resource>` or `None` # pylint: disable=line-too-long
"""
rrs_request = self.dns.resourceRecordSets()
# Add dot as the API returns absolute domains
record_name += "."
request = rrs_request.list(project=self.project_id, managedZone=zone_id, name=record_name,
type="TXT")
try:
response = request.execute()
except googleapiclient_errors.Error:
logger.info("Unable to list existing records. If you're "
"requesting a wildcard certificate, this might not work.")
logger.debug("Error was:", exc_info=True)
else:
if response and response["rrsets"]:
return cast(dict[str, Any], response["rrsets"][0])
return None
def _find_managed_zone_id(self, domain: str) -> str:
"""
Find the managed zone for a given domain.
:param str domain: The domain for which to find the managed zone.
:returns: The ID of the managed zone, if found.
:rtype: str
:raises certbot.errors.PluginError: if the managed zone cannot be found.
"""
zone_dns_name_guesses = dns_common.base_domain_name_guesses(domain)
mz = self.dns.managedZones()
for zone_name in zone_dns_name_guesses:
try:
request = mz.list(project=self.project_id, dnsName=zone_name + '.')
response = request.execute()
zones = response['managedZones']
except googleapiclient_errors.Error as e:
raise errors.PluginError('Encountered error finding managed zone: {0}'
.format(e))
for zone in zones:
zone_id: str = zone['id']
if zone['visibility'] == "public":
logger.debug('Found id of %s for %s using name %s', zone_id, domain, zone_name)
return zone_id
raise errors.PluginError('Unable to determine managed zone for {0} using zone names: {1}.'
.format(domain, zone_dns_name_guesses))
|