1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
|
"""A class that performs HTTP-01 challenges for Apache"""
import errno
import logging
from typing import List
from typing import Set
from typing import TYPE_CHECKING
from acme.challenges import KeyAuthorizationChallengeResponse
from certbot import errors
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge
from certbot.compat import filesystem
from certbot.compat import os
from certbot.plugins import common
from certbot_apache._internal.obj import VirtualHost
from certbot_apache._internal.parser import get_aug_path
if TYPE_CHECKING:
from certbot_apache._internal.configurator import ApacheConfigurator # pragma: no cover
logger = logging.getLogger(__name__)
class ApacheHttp01(common.ChallengePerformer):
"""Class that performs HTTP-01 challenges within the Apache configurator."""
CONFIG_TEMPLATE24_PRE = """\
RewriteRule ^/\\.well-known/acme-challenge/([A-Za-z0-9-_=]+)$ {0}/$1 [END]
"""
CONFIG_TEMPLATE24_POST = """\
RewriteEngine on
<Directory {0}>
Require all granted
</Directory>
<Location /.well-known/acme-challenge>
Require all granted
</Location>
"""
def __init__(self, configurator: "ApacheConfigurator") -> None:
super().__init__(configurator)
self.configurator: "ApacheConfigurator"
self.challenge_conf_pre = os.path.join(
self.configurator.conf("challenge-location"),
"le_http_01_challenge_pre.conf")
self.challenge_conf_post = os.path.join(
self.configurator.conf("challenge-location"),
"le_http_01_challenge_post.conf")
self.challenge_dir = os.path.join(
self.configurator.config.work_dir,
"http_challenges")
self.moded_vhosts: Set[VirtualHost] = set()
def perform(self) -> List[KeyAuthorizationChallengeResponse]:
"""Perform all HTTP-01 challenges."""
if not self.achalls:
return []
# Save any changes to the configuration as a precaution
# About to make temporary changes to the config
self.configurator.save("Changes before challenge setup", True)
self.configurator.ensure_listen(str(self.configurator.config.http01_port))
self.prepare_http01_modules()
responses = self._set_up_challenges()
self._mod_config()
# Save reversible changes
self.configurator.save("HTTP Challenge", True)
return responses
def prepare_http01_modules(self) -> None:
"""Make sure that we have the needed modules available for http01"""
if self.configurator.conf("handle-modules"):
needed_modules = ["rewrite", "authz_core"]
for mod in needed_modules:
if mod + "_module" not in self.configurator.parser.modules:
self.configurator.enable_mod(mod, temp=True)
def _mod_config(self) -> None:
selected_vhosts: List[VirtualHost] = []
http_port = str(self.configurator.config.http01_port)
# Search for VirtualHosts matching by name
for chall in self.achalls:
selected_vhosts += self._matching_vhosts(chall.domain)
# Ensure that we have one or more VirtualHosts that we can continue
# with. (one that listens to port configured with --http-01-port)
found = False
for vhost in selected_vhosts:
if any(a.is_wildcard() or a.get_port() == http_port for a in vhost.addrs):
found = True
# If there's at least one eligible VirtualHost, also add all unnamed VirtualHosts
# because they might match at runtime (#8890)
if found:
selected_vhosts += self._unnamed_vhosts()
# Otherwise, add every Virtualhost which listens on the right port
else:
selected_vhosts += self._relevant_vhosts()
# Add the challenge configuration
for vh in selected_vhosts:
self._set_up_include_directives(vh)
self.configurator.reverter.register_file_creation(
True, self.challenge_conf_pre)
self.configurator.reverter.register_file_creation(
True, self.challenge_conf_post)
config_text_pre = self.CONFIG_TEMPLATE24_PRE.format(self.challenge_dir)
config_text_post = self.CONFIG_TEMPLATE24_POST.format(self.challenge_dir)
logger.debug("writing a pre config file with text:\n %s", config_text_pre)
with open(self.challenge_conf_pre, "w") as new_conf:
new_conf.write(config_text_pre)
logger.debug("writing a post config file with text:\n %s", config_text_post)
with open(self.challenge_conf_post, "w") as new_conf:
new_conf.write(config_text_post)
def _matching_vhosts(self, domain: str) -> List[VirtualHost]:
"""Return all VirtualHost objects that have the requested domain name or
a wildcard name that would match the domain in ServerName or ServerAlias
directive.
"""
matching_vhosts = []
for vhost in self.configurator.vhosts:
if self.configurator.domain_in_names(vhost.get_names(), domain):
# domain_in_names also matches the exact names, so no need
# to check "domain in vhost.get_names()" explicitly here
matching_vhosts.append(vhost)
return matching_vhosts
def _relevant_vhosts(self) -> List[VirtualHost]:
http01_port = str(self.configurator.config.http01_port)
relevant_vhosts: List[VirtualHost] = []
for vhost in self.configurator.vhosts:
if any(a.is_wildcard() or a.get_port() == http01_port for a in vhost.addrs):
if not vhost.ssl:
relevant_vhosts.append(vhost)
if not relevant_vhosts:
raise errors.PluginError(
"Unable to find a virtual host listening on port {0} which is"
" currently needed for Certbot to prove to the CA that you"
" control your domain. Please add a virtual host for port"
" {0}.".format(http01_port))
return relevant_vhosts
def _unnamed_vhosts(self) -> List[VirtualHost]:
"""Return all VirtualHost objects with no ServerName"""
return [vh for vh in self.configurator.vhosts if vh.name is None]
def _set_up_challenges(self) -> List[KeyAuthorizationChallengeResponse]:
if not os.path.isdir(self.challenge_dir):
with filesystem.temp_umask(0o022):
try:
filesystem.makedirs(self.challenge_dir, 0o755)
except OSError as exception:
if exception.errno not in (errno.EEXIST, errno.EISDIR):
raise errors.PluginError(
"Couldn't create root for http-01 challenge")
responses = []
for achall in self.achalls:
responses.append(self._set_up_challenge(achall))
return responses
def _set_up_challenge(self, achall: KeyAuthorizationAnnotatedChallenge
) -> KeyAuthorizationChallengeResponse:
response: KeyAuthorizationChallengeResponse
response, validation = achall.response_and_validation()
name: str = os.path.join(self.challenge_dir, achall.chall.encode("token"))
self.configurator.reverter.register_file_creation(True, name)
with open(name, 'wb') as f:
f.write(validation.encode())
filesystem.chmod(name, 0o644)
return response
def _set_up_include_directives(self, vhost: VirtualHost) -> None:
"""Includes override configuration to the beginning and to the end of
VirtualHost. Note that this include isn't added to Augeas search tree"""
if vhost not in self.moded_vhosts:
logger.debug(
"Adding a temporary challenge validation Include for name: %s in: %s",
vhost.name, vhost.filep)
self.configurator.parser.add_dir_beginning(
vhost.path, "Include", self.challenge_conf_pre)
self.configurator.parser.add_dir(
vhost.path, "Include", self.challenge_conf_post)
if not vhost.enabled:
self.configurator.parser.add_dir(
get_aug_path(self.configurator.parser.loc["default"]),
"Include", vhost.filep)
self.moded_vhosts.add(vhost)
|