1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
|
"""Tests for certbot_nginx._internal.http_01"""
import sys
import unittest
from unittest import mock
import josepy as jose
import pytest
from acme import challenges
from acme import messages
from certbot import achallenges
from certbot.tests import acme_util
from certbot.tests import util as test_util
from certbot_nginx._internal.obj import Addr
from certbot_nginx._internal.tests import test_util as util
AUTH_KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
class HttpPerformTest(util.NginxTest):
"""Test the NginxHttp01 challenge."""
account_key = AUTH_KEY
achalls = [
achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.HTTP01(token=b"kNdwjwOeX0I_A8DXt9Msmg"), messages.STATUS_PENDING),
domain="www.example.com", account_key=account_key),
achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.HTTP01(
token=b"\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y"
b"\x80\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945"
), messages.STATUS_PENDING),
domain="ipv6.com", account_key=account_key),
achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.HTTP01(
token=b"\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd"
b"\xeb9\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4"
), messages.STATUS_PENDING),
domain="www.example.org", account_key=account_key),
achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.HTTP01(token=b"kNdwjxOeX0I_A8DXt9Msmg"), messages.STATUS_PENDING),
domain="migration.com", account_key=account_key),
achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.HTTP01(token=b"kNdwjxOeX0I_A8DXt9Msmg"), messages.STATUS_PENDING),
domain="ipv6ssl.com", account_key=account_key),
]
def setUp(self):
super().setUp()
config = self.get_nginx_configurator(
self.config_path, self.config_dir, self.work_dir, self.logs_dir)
from certbot_nginx._internal import http_01
self.http01 = http_01.NginxHttp01(config)
def test_perform0(self):
responses = self.http01.perform()
assert [] == responses
@mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.save")
def test_perform1(self, mock_save):
self.http01.add_chall(self.achalls[0])
response = self.achalls[0].response(self.account_key)
responses = self.http01.perform()
assert [response] == responses
assert mock_save.call_count == 1
def test_perform2(self):
acme_responses = []
for achall in self.achalls:
self.http01.add_chall(achall)
acme_responses.append(achall.response(self.account_key))
http_responses = self.http01.perform()
assert len(http_responses) == 5
for i in range(5):
assert http_responses[i] == acme_responses[i]
def test_mod_config(self):
self.http01.add_chall(self.achalls[0])
self.http01.add_chall(self.achalls[2])
self.http01._mod_config() # pylint: disable=protected-access
self.http01.configurator.save()
self.http01.configurator.parser.load()
# vhosts = self.http01.configurator.parser.get_vhosts()
# for vhost in vhosts:
# pass
# if the name matches
# check that the location block is in there and is correct
# if vhost.addrs == set(v_addr1):
# response = self.achalls[0].response(self.account_key)
# else:
# response = self.achalls[2].response(self.account_key)
# self.assertEqual(vhost.addrs, set(v_addr2_print))
# self.assertEqual(vhost.names, set([response.z_domain.decode('ascii')]))
@mock.patch('certbot_nginx._internal.parser.NginxParser.add_server_directives')
def test_mod_config_http_and_https(self, mock_add_server_directives):
"""A server_name with both HTTP and HTTPS vhosts should get modded in both vhosts"""
self.configuration.https_port = 443
self.http01.add_chall(self.achalls[3]) # migration.com
self.http01._mod_config() # pylint: disable=protected-access
# Domain has an HTTP and HTTPS vhost
# 2 * 'rewrite' + 2 * 'return 200 keyauthz' = 4
assert mock_add_server_directives.call_count == 4
@mock.patch('certbot_nginx._internal.parser.nginxparser.dump')
@mock.patch('certbot_nginx._internal.parser.NginxParser.add_server_directives')
def test_mod_config_only_https(self, mock_add_server_directives, mock_dump):
"""A server_name with only an HTTPS vhost should get modded"""
self.http01.add_chall(self.achalls[4]) # ipv6ssl.com
self.http01._mod_config() # pylint: disable=protected-access
# It should modify the existing HTTPS vhost
assert mock_add_server_directives.call_count == 2
# since there was no suitable HTTP vhost or default HTTP vhost, a non-empty one
# should have been created and written to the challenge conf file
assert mock_dump.call_args[0][0] != []
@mock.patch('certbot_nginx._internal.parser.NginxParser.add_server_directives')
def test_mod_config_deduplicate(self, mock_add_server_directives):
"""A vhost that appears in both HTTP and HTTPS vhosts only gets modded once"""
achall = achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.HTTP01(token=b"kNdwjxOeX0I_A8DXt9Msmg"), messages.STATUS_PENDING),
domain="ssl.both.com", account_key=AUTH_KEY)
self.http01.add_chall(achall)
self.http01._mod_config() # pylint: disable=protected-access
# Should only get called 5 times, rather than 6, because two vhosts are the same
assert mock_add_server_directives.call_count == 5*2
def test_mod_config_insert_bucket_directive(self):
nginx_conf = self.http01.configurator.parser.abs_path('nginx.conf')
expected = ['server_names_hash_bucket_size', '128']
original_conf = self.http01.configurator.parser.parsed[nginx_conf]
assert not util.contains_at_depth(original_conf, expected, 2)
self.http01.add_chall(self.achalls[0])
self.http01._mod_config() # pylint: disable=protected-access
self.http01.configurator.save()
self.http01.configurator.parser.load()
generated_conf = self.http01.configurator.parser.parsed[nginx_conf]
assert util.contains_at_depth(generated_conf, expected, 2)
def test_mod_config_update_bucket_directive_in_included_file(self):
# save old example.com config
example_com_loc = self.http01.configurator.parser.abs_path('sites-enabled/example.com')
with open(example_com_loc) as f:
original_example_com = f.read()
# modify example.com config
modified_example_com = 'server_names_hash_bucket_size 64;\n' + original_example_com
with open(example_com_loc, 'w') as f:
f.write(modified_example_com)
self.http01.configurator.parser.load()
# run change
self.http01.add_chall(self.achalls[0])
self.http01._mod_config() # pylint: disable=protected-access
self.http01.configurator.save()
self.http01.configurator.parser.load()
# not in nginx.conf
expected = ['server_names_hash_bucket_size', '128']
nginx_conf_loc = self.http01.configurator.parser.abs_path('nginx.conf')
nginx_conf = self.http01.configurator.parser.parsed[nginx_conf_loc]
assert not util.contains_at_depth(nginx_conf, expected, 2)
# is updated in example.com conf
generated_conf = self.http01.configurator.parser.parsed[example_com_loc]
assert util.contains_at_depth(generated_conf, expected, 0)
# put back example.com config
with open(example_com_loc, 'w') as f:
f.write(original_example_com)
self.http01.configurator.parser.load()
@mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.ipv6_info")
def test_default_listen_addresses_no_memoization(self, ipv6_info):
# pylint: disable=protected-access
ipv6_info.return_value = (True, True)
self.http01._default_listen_addresses()
assert ipv6_info.call_count == 1
ipv6_info.return_value = (False, False)
self.http01._default_listen_addresses()
assert ipv6_info.call_count == 2
@mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.ipv6_info")
def test_default_listen_addresses_t_t(self, ipv6_info):
# pylint: disable=protected-access
ipv6_info.return_value = (True, True)
addrs = self.http01._default_listen_addresses()
http_addr = Addr.fromstring("80")
http_ipv6_addr = Addr.fromstring("[::]:80")
assert addrs == [http_addr, http_ipv6_addr]
@mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.ipv6_info")
def test_default_listen_addresses_t_f(self, ipv6_info):
# pylint: disable=protected-access
ipv6_info.return_value = (True, False)
addrs = self.http01._default_listen_addresses()
http_addr = Addr.fromstring("80")
http_ipv6_addr = Addr.fromstring("[::]:80 ipv6only=on")
assert addrs == [http_addr, http_ipv6_addr]
@mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.ipv6_info")
def test_default_listen_addresses_f_f(self, ipv6_info):
# pylint: disable=protected-access
ipv6_info.return_value = (False, False)
addrs = self.http01._default_listen_addresses()
http_addr = Addr.fromstring("80")
assert addrs == [http_addr]
if __name__ == "__main__":
sys.exit(pytest.main(sys.argv[1:] + [__file__])) # pragma: no cover
|