File: http_01_test.py

package info (click to toggle)
python-certbot-nginx 4.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 660 kB
  • sloc: python: 4,507; sh: 47; makefile: 11
file content (234 lines) | stat: -rw-r--r-- 10,230 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""Tests for certbot_nginx._internal.http_01"""
import sys
import unittest
from unittest import mock

import josepy as jose
import pytest

from acme import challenges
from acme import messages
from certbot import achallenges
from certbot.tests import acme_util
from certbot.tests import util as test_util
from certbot_nginx._internal.obj import Addr
from certbot_nginx._internal.tests import test_util as util

AUTH_KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))


class HttpPerformTest(util.NginxTest):
    """Test the NginxHttp01 challenge."""

    account_key = AUTH_KEY
    achalls = [
        achallenges.KeyAuthorizationAnnotatedChallenge(
            challb=acme_util.chall_to_challb(
                challenges.HTTP01(token=b"kNdwjwOeX0I_A8DXt9Msmg"), messages.STATUS_PENDING),
            domain="www.example.com", account_key=account_key),
        achallenges.KeyAuthorizationAnnotatedChallenge(
            challb=acme_util.chall_to_challb(
                challenges.HTTP01(
                    token=b"\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y"
                          b"\x80\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945"
                ), messages.STATUS_PENDING),
            domain="ipv6.com", account_key=account_key),
        achallenges.KeyAuthorizationAnnotatedChallenge(
            challb=acme_util.chall_to_challb(
                challenges.HTTP01(
                    token=b"\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd"
                          b"\xeb9\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4"
                ), messages.STATUS_PENDING),
            domain="www.example.org", account_key=account_key),
        achallenges.KeyAuthorizationAnnotatedChallenge(
            challb=acme_util.chall_to_challb(
                challenges.HTTP01(token=b"kNdwjxOeX0I_A8DXt9Msmg"), messages.STATUS_PENDING),
            domain="migration.com", account_key=account_key),
        achallenges.KeyAuthorizationAnnotatedChallenge(
            challb=acme_util.chall_to_challb(
                challenges.HTTP01(token=b"kNdwjxOeX0I_A8DXt9Msmg"), messages.STATUS_PENDING),
            domain="ipv6ssl.com", account_key=account_key),
    ]

    def setUp(self):
        super().setUp()

        config = self.get_nginx_configurator(
            self.config_path, self.config_dir, self.work_dir, self.logs_dir)

        from certbot_nginx._internal import http_01
        self.http01 = http_01.NginxHttp01(config)

    def test_perform0(self):
        responses = self.http01.perform()
        assert [] == responses

    @mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.save")
    def test_perform1(self, mock_save):
        self.http01.add_chall(self.achalls[0])
        response = self.achalls[0].response(self.account_key)

        responses = self.http01.perform()

        assert [response] == responses
        assert mock_save.call_count == 1

    def test_perform2(self):
        acme_responses = []
        for achall in self.achalls:
            self.http01.add_chall(achall)
            acme_responses.append(achall.response(self.account_key))

        http_responses = self.http01.perform()

        assert len(http_responses) == 5
        for i in range(5):
            assert http_responses[i] == acme_responses[i]

    def test_mod_config(self):
        self.http01.add_chall(self.achalls[0])
        self.http01.add_chall(self.achalls[2])

        self.http01._mod_config()  # pylint: disable=protected-access

        self.http01.configurator.save()

        self.http01.configurator.parser.load()

        # vhosts = self.http01.configurator.parser.get_vhosts()

        # for vhost in vhosts:
        #     pass
            # if the name matches
            # check that the location block is in there and is correct

            # if vhost.addrs == set(v_addr1):
            #     response = self.achalls[0].response(self.account_key)
            # else:
            #     response = self.achalls[2].response(self.account_key)
            #     self.assertEqual(vhost.addrs, set(v_addr2_print))
            # self.assertEqual(vhost.names, set([response.z_domain.decode('ascii')]))

    @mock.patch('certbot_nginx._internal.parser.NginxParser.add_server_directives')
    def test_mod_config_http_and_https(self, mock_add_server_directives):
        """A server_name with both HTTP and HTTPS vhosts should get modded in both vhosts"""
        self.configuration.https_port = 443
        self.http01.add_chall(self.achalls[3]) # migration.com
        self.http01._mod_config()  # pylint: disable=protected-access

        # Domain has an HTTP and HTTPS vhost
        # 2 * 'rewrite' + 2 * 'return 200 keyauthz' = 4
        assert mock_add_server_directives.call_count == 4

    @mock.patch('certbot_nginx._internal.parser.nginxparser.dump')
    @mock.patch('certbot_nginx._internal.parser.NginxParser.add_server_directives')
    def test_mod_config_only_https(self, mock_add_server_directives, mock_dump):
        """A server_name with only an HTTPS vhost should get modded"""
        self.http01.add_chall(self.achalls[4]) # ipv6ssl.com
        self.http01._mod_config() # pylint: disable=protected-access

        # It should modify the existing HTTPS vhost
        assert mock_add_server_directives.call_count == 2
        # since there was no suitable HTTP vhost or default HTTP vhost, a non-empty one
        # should have been created and written to the challenge conf file
        assert mock_dump.call_args[0][0] != []

    @mock.patch('certbot_nginx._internal.parser.NginxParser.add_server_directives')
    def test_mod_config_deduplicate(self, mock_add_server_directives):
        """A vhost that appears in both HTTP and HTTPS vhosts only gets modded once"""
        achall = achallenges.KeyAuthorizationAnnotatedChallenge(
            challb=acme_util.chall_to_challb(
                challenges.HTTP01(token=b"kNdwjxOeX0I_A8DXt9Msmg"), messages.STATUS_PENDING),
            domain="ssl.both.com", account_key=AUTH_KEY)
        self.http01.add_chall(achall)
        self.http01._mod_config() # pylint: disable=protected-access

        # Should only get called 5 times, rather than 6, because two vhosts are the same
        assert mock_add_server_directives.call_count == 5*2

    def test_mod_config_insert_bucket_directive(self):
        nginx_conf = self.http01.configurator.parser.abs_path('nginx.conf')

        expected = ['server_names_hash_bucket_size', '128']
        original_conf = self.http01.configurator.parser.parsed[nginx_conf]
        assert not util.contains_at_depth(original_conf, expected, 2)

        self.http01.add_chall(self.achalls[0])
        self.http01._mod_config()  # pylint: disable=protected-access
        self.http01.configurator.save()
        self.http01.configurator.parser.load()

        generated_conf = self.http01.configurator.parser.parsed[nginx_conf]
        assert util.contains_at_depth(generated_conf, expected, 2)

    def test_mod_config_update_bucket_directive_in_included_file(self):
        # save old example.com config
        example_com_loc = self.http01.configurator.parser.abs_path('sites-enabled/example.com')
        with open(example_com_loc) as f:
            original_example_com = f.read()

        # modify example.com config
        modified_example_com = 'server_names_hash_bucket_size 64;\n' + original_example_com
        with open(example_com_loc, 'w') as f:
            f.write(modified_example_com)
        self.http01.configurator.parser.load()

        # run change
        self.http01.add_chall(self.achalls[0])
        self.http01._mod_config()  # pylint: disable=protected-access
        self.http01.configurator.save()
        self.http01.configurator.parser.load()

        # not in nginx.conf
        expected = ['server_names_hash_bucket_size', '128']
        nginx_conf_loc = self.http01.configurator.parser.abs_path('nginx.conf')
        nginx_conf = self.http01.configurator.parser.parsed[nginx_conf_loc]
        assert not util.contains_at_depth(nginx_conf, expected, 2)

        # is updated in example.com conf
        generated_conf = self.http01.configurator.parser.parsed[example_com_loc]
        assert util.contains_at_depth(generated_conf, expected, 0)

        # put back example.com config
        with open(example_com_loc, 'w') as f:
            f.write(original_example_com)
        self.http01.configurator.parser.load()

    @mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.ipv6_info")
    def test_default_listen_addresses_no_memoization(self, ipv6_info):
        # pylint: disable=protected-access
        ipv6_info.return_value = (True, True)
        self.http01._default_listen_addresses()
        assert ipv6_info.call_count == 1
        ipv6_info.return_value = (False, False)
        self.http01._default_listen_addresses()
        assert ipv6_info.call_count == 2

    @mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.ipv6_info")
    def test_default_listen_addresses_t_t(self, ipv6_info):
        # pylint: disable=protected-access
        ipv6_info.return_value = (True, True)
        addrs = self.http01._default_listen_addresses()
        http_addr = Addr.fromstring("80")
        http_ipv6_addr = Addr.fromstring("[::]:80")
        assert addrs == [http_addr, http_ipv6_addr]

    @mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.ipv6_info")
    def test_default_listen_addresses_t_f(self, ipv6_info):
        # pylint: disable=protected-access
        ipv6_info.return_value = (True, False)
        addrs = self.http01._default_listen_addresses()
        http_addr = Addr.fromstring("80")
        http_ipv6_addr = Addr.fromstring("[::]:80 ipv6only=on")
        assert addrs == [http_addr, http_ipv6_addr]

    @mock.patch("certbot_nginx._internal.configurator.NginxConfigurator.ipv6_info")
    def test_default_listen_addresses_f_f(self, ipv6_info):
        # pylint: disable=protected-access
        ipv6_info.return_value = (False, False)
        addrs = self.http01._default_listen_addresses()
        http_addr = Addr.fromstring("80")
        assert addrs == [http_addr]

if __name__ == "__main__":
    sys.exit(pytest.main(sys.argv[1:] + [__file__]))  # pragma: no cover