1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
|
import copy
import pytest
import re
from configuration import available_ports, TLS13_CIPHERS, ALL_TEST_CURVES, MINIMAL_TEST_CERTS
from common import ProviderOptions, Protocols, data_bytes
from fixtures import managed_process # lgtm [py/unused-import]
from providers import Provider, S2N, OpenSSL
from utils import invalid_test_parameters, get_parameter_name, get_expected_s2n_version, to_bytes
PADDING_SIZE_SMALL = 250
PADDING_SIZE_MEDIUM = 1000
PADDING_SIZE_MAX = 1 << 14
PADDING_SIZES = [
PADDING_SIZE_SMALL,
PADDING_SIZE_MEDIUM,
PADDING_SIZE_MAX
]
# arbitrarily large payload size
PAYLOAD_SIZE = 1024
OPENSSL_RECORD_WRITTEN_PATTERN = r"write to .*?\\n(.*?)\\n"
OPENSSL_APP_DATA_HEADER_PATTERN = r"17 03 03 ([0-9a-f]{2} [0-9a-f]{2})"
RECORD_SIZE_GROUP = 1
def strip_string_of_bytes(s: str) -> str:
# s has the form `b'<>'`. We need to strip the literal `b'` and the last `'`
return s[2:-1]
def get_payload_size_from_openssl_trace(record_size_bytes: str) -> int:
# record_size_bytes is in the form XX XX where X is a hex digit
size_in_hex = record_size_bytes.replace(' ', '')
size = int(size_in_hex, 16)
# record includes 16 bytes of aead tag
return size - 16
def assert_openssl_records_are_padded_correctly(openssl_output: str, padding_size: int):
number_of_app_data_records = 0
records_written = re.findall(
OPENSSL_RECORD_WRITTEN_PATTERN, openssl_output)
for record_prefix in records_written:
app_data_header = re.search(
OPENSSL_APP_DATA_HEADER_PATTERN, record_prefix)
if app_data_header:
size_bytes = app_data_header.group(RECORD_SIZE_GROUP)
size = get_payload_size_from_openssl_trace(size_bytes)
assert size > 0
assert size % padding_size == 0
number_of_app_data_records += 1
# The client and server write a variable number of encrypted handshake records,
# but each write at least one (Finished). We also send at least one ApplicationData record.
assert number_of_app_data_records >= 2
def test_nothing():
"""
Sometimes the record padding test parameters in combination with the s2n libcrypto
results in no test cases existing. In this case, pass a nothing test to avoid
marking the entire codebuild run as failed.
"""
assert True
@pytest.mark.uncollect_if(func=invalid_test_parameters)
@pytest.mark.parametrize("cipher", TLS13_CIPHERS, ids=get_parameter_name)
@pytest.mark.parametrize("provider", [OpenSSL])
@pytest.mark.parametrize("curve", ALL_TEST_CURVES, ids=get_parameter_name)
# only tls 1.3 supports record padding
@pytest.mark.parametrize("protocol", [Protocols.TLS13], ids=get_parameter_name)
@pytest.mark.parametrize("certificate", MINIMAL_TEST_CERTS, ids=get_parameter_name)
@pytest.mark.parametrize("padding_size", PADDING_SIZES, ids=get_parameter_name)
def test_s2n_server_handles_padded_records(managed_process, cipher, provider, curve, protocol, certificate,
padding_size):
port = next(available_ports)
random_bytes = data_bytes(PAYLOAD_SIZE)
client_options = ProviderOptions(
mode=Provider.ClientMode,
port=port,
cipher=cipher,
cert=certificate.cert,
curve=curve,
data_to_send=random_bytes,
insecure=True,
protocol=protocol,
extra_flags=['-record_padding', padding_size]
)
server_options = copy.copy(client_options)
server_options.data_to_send = None
server_options.mode = Provider.ServerMode
server_options.key = certificate.key
server_options.extra_flags = None
s2nd = managed_process(S2N, server_options, timeout=5)
openssl = managed_process(provider, client_options, timeout=5)
for client_results in openssl.get_results():
client_results.assert_success()
assert_openssl_records_are_padded_correctly(
str(client_results.stdout), padding_size)
expected_version = get_expected_s2n_version(protocol, provider)
for server_results in s2nd.get_results():
server_results.assert_success()
# verify that the payload was correctly received by the server
assert random_bytes in server_results.stdout
# verify that the version was correctly negotiated
assert to_bytes("Actual protocol version: {}".format(
expected_version)) in server_results.stdout
# verify that the cipher was correctly negotiated
assert to_bytes("Cipher negotiated: {}".format(
cipher.name)) in server_results.stdout
@pytest.mark.uncollect_if(func=invalid_test_parameters)
@pytest.mark.parametrize("cipher", TLS13_CIPHERS, ids=get_parameter_name)
@pytest.mark.parametrize("provider", [OpenSSL])
@pytest.mark.parametrize("curve", ALL_TEST_CURVES, ids=get_parameter_name)
# only tls 1.3 supports record padding
@pytest.mark.parametrize("protocol", [Protocols.TLS13], ids=get_parameter_name)
@pytest.mark.parametrize("certificate", MINIMAL_TEST_CERTS, ids=get_parameter_name)
@pytest.mark.parametrize("padding_size", PADDING_SIZES, ids=get_parameter_name)
def test_s2n_client_handles_padded_records(managed_process, cipher, provider, curve, protocol, certificate,
padding_size):
port = next(available_ports)
client_random_bytes = data_bytes(PAYLOAD_SIZE)
server_random_bytes = data_bytes(PAYLOAD_SIZE)
server_options = ProviderOptions(
mode=Provider.ServerMode,
port=port,
cipher=cipher,
curve=curve,
cert=certificate.cert,
key=certificate.key,
insecure=True,
protocol=protocol,
data_to_send=server_random_bytes,
extra_flags=['-record_padding', padding_size]
)
client_options = copy.copy(server_options)
client_options.mode = Provider.ClientMode
client_options.extra_flags = None
client_options.data_to_send = client_random_bytes
# openssl will send its response after it has received s2nc's record
openssl = managed_process(provider, server_options,
timeout=5, send_marker=strip_string_of_bytes(str(client_random_bytes)))
# s2nc will wait until it has received the server's response before closing
s2nc = managed_process(S2N, client_options, timeout=5,
close_marker=strip_string_of_bytes(str(server_random_bytes)))
expected_version = get_expected_s2n_version(protocol, provider)
for client_results in s2nc.get_results():
client_results.assert_success()
# assert that the client has received server's application payload
assert server_random_bytes in client_results.stdout
assert to_bytes("Actual protocol version: {}".format(
expected_version)) in client_results.stdout
assert to_bytes("Cipher negotiated: {}".format(
cipher.name)) in client_results.stdout
for server_results in openssl.get_results():
server_results.assert_success()
assert_openssl_records_are_padded_correctly(
str(server_results.stdout), padding_size)
|