1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
import requests
import six
from collections import namedtuple
from certbot.plugins import dns_common
_GandiConfig = namedtuple('_GandiConfig', ('api_key', 'sharing_id', 'personal_access_token',))
_BaseDomain = namedtuple('_BaseDomain', ('fqdn'))
def get_config(api_key, sharing_id, personal_access_token=None):
return _GandiConfig(api_key=api_key, sharing_id=sharing_id, personal_access_token=personal_access_token)
def _get_json(response):
try:
data = response.json()
except ValueError:
return dict()
return data
def _get_response_message(response, default='<No reason given>'):
return _get_json(response).get('message', default)
def _headers(cfg):
if cfg.personal_access_token:
auth = 'Bearer ' + cfg.personal_access_token
else:
auth = 'Apikey ' + cfg.api_key
return {
'Content-Type': 'application/json',
'Authorization': auth,
}
def _get_url(*segs):
return 'https://api.gandi.net/v5/livedns/{}'.format('/'.join(segs))
def _request(cfg, method, segs, **kw):
headers = _headers(cfg)
url = _get_url(*segs)
return requests.request(method, url, headers=headers, params={'sharing_id': cfg.sharing_id}, **kw)
def _get_base_domain(cfg, domain):
for candidate_base_domain in dns_common.base_domain_name_guesses(domain):
response = _request(cfg, 'GET', ('domains', candidate_base_domain))
if response.ok:
data = _get_json(response)
fqdn = data.get('fqdn')
if fqdn:
return _BaseDomain(fqdn=fqdn)
return None
def _get_relative_name(base_domain, name):
suffix = '.' + base_domain.fqdn
return name[:-len(suffix)] if name.endswith(suffix) else None
def _get_txt_record(cfg, base_domain, relative_name):
response = _request(cfg, 'GET', ['domains', base_domain.fqdn, 'records', relative_name, 'TXT'])
if not response.ok:
return []
data = _get_json(response)
vals = data.get('rrset_values')
if vals:
return vals
else:
return []
def _update_txt_record(cfg, base_domain, relative_name, rrset):
return _request(cfg, 'PUT', ['domains', base_domain.fqdn, 'records', relative_name, 'TXT'],
json={'rrset_values': rrset})
def _update_record(cfg, domain, name, request_runner):
base_domain = _get_base_domain(cfg, domain)
if base_domain is None:
return 'Unable to get base domain for "{}"'.format(domain)
relative_name = _get_relative_name(base_domain, name)
if relative_name is None:
return 'Unable to derive relative name for "{}"'.format(name)
response = request_runner(base_domain, relative_name)
return None if response.ok else _get_response_message(response)
def add_txt_record(cfg, domain, name, value):
def requester(base_domain, relative_name):
rrset = [value] + _get_txt_record(cfg, base_domain, relative_name)
return _update_txt_record(cfg, base_domain, relative_name, rrset)
return _update_record(cfg, domain, name, requester)
def del_txt_record(cfg, domain, name, value):
def requester(base_domain, relative_name):
existing = _get_txt_record(cfg, base_domain, relative_name)
rrset = list(filter(lambda rr: rr.strip('"') != value, existing))
return _update_txt_record(cfg, base_domain, relative_name, rrset)
return _update_record(cfg, domain, name, requester)
|