1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
|
# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
#
# SPDX-License-Identifier: MIT
import json
import tempfile
import shutil
import subprocess
import unittest
import sys
import aiocoap
from .test_server import WithClient, WithTestServer
from .fixtures import no_warnings
from .common import tcp_disabled, run_fixture_as_standalone_server
IS_STANDALONE = False
class WithTLSServer(WithTestServer):
async def asyncSetUp(self):
self.keydir = tempfile.mkdtemp(suffix="-testkeypair")
self.keyfile = self.keydir + "/key.pem"
self.certfile = self.keydir + "/cert.pem"
self.credentialsfile = self.keydir + "/credentials.json"
subprocess.check_call(
[
"openssl",
"req",
"-x509",
"-newkey",
"rsa:4096",
"-keyout",
self.keyfile,
"-out",
self.certfile,
"-days",
"5",
"-nodes",
"-subj",
"/CN=%s" % self.servernamealias,
],
stderr=subprocess.DEVNULL,
)
# Write out for the benefit of standalone clients during debugging
with open(self.credentialsfile, "w") as of:
json.dump(
{
"coaps+tcp://%s/*" % self.servernamealias: {
"tlscert": {"certfile": self.certfile}
}
},
of,
)
if IS_STANDALONE:
print(
"To test, run ./aiocoap-client coaps+tcp://%s/whoami --credentials %s"
% (
self.servernamealias,
self.credentialsfile,
)
)
await super().asyncSetUp()
async def asyncTearDown(self):
await super().asyncTearDown()
shutil.rmtree(self.keydir)
def get_server_ssl_context(self):
import ssl
# FIXME: copied from aiocoap.cli.common
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=self.certfile, keyfile=self.keyfile)
ssl_context.set_alpn_protocols(["coap"])
ssl_context.sni_callback = lambda obj, name, context: setattr(
obj, "indicated_server_name", name
)
return ssl_context
class WithTLSClient(WithClient):
# This expects that something -- typically the colocated WithTestServer -- sets certfile first
async def asyncSetUp(self):
await super().asyncSetUp()
# we're not async ourself, but WithClient only sets up the client in
# asyncSetUp, and apparently, setUp runs before asyncSetUp, so we have
# to be in (and wait for) asyncSetUp too
self.client.client_credentials["coaps+tcp://%s/*" % self.servernamealias] = (
aiocoap.credentials.TLSCert(certfile=self.certfile)
)
@unittest.skipIf(tcp_disabled, "TCP disabled in environment")
class TestTLS(WithTLSServer, WithTLSClient):
@no_warnings
async def test_tls(self):
request = aiocoap.Message(code=aiocoap.GET)
request.set_request_uri(
"coaps+tcp://%s/whoami" % self.servernamealias, set_uri_host=False
)
response = await self.client.request(request).response_raising
response = json.loads(response.payload)
self.assertEqual(
response["requested_uri"],
"coaps+tcp://%s/whoami" % self.servernamealias,
"SNI name was not used by the server",
)
if __name__ == "__main__":
# due to the imports, you'll need to run this as `python3 -m tests.test_server`
IS_STANDALONE = True
import logging
logging.basicConfig(level=logging.DEBUG)
run_fixture_as_standalone_server(TestTLS)
|