File: tests_tls_netaccess.uts

package info (click to toggle)
scapy 2.5.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 12,136 kB
  • sloc: python: 127,773; sh: 133; makefile: 11
file content (378 lines) | stat: -rw-r--r-- 12,426 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
% TLS session establishment tests

~ crypto needs_root

# More information at http://www.secdev.org/projects/UTscapy/

############
############
+ TLS server automaton tests
~ server

= Load server util functions
~ client

from __future__ import print_function

import sys, os, re, time, subprocess
from scapy.libs.six.moves.queue import Queue
import threading

from ast import literal_eval
import os
import sys
from contextlib import contextmanager
from scapy.autorun import StringWriter

from scapy.libs import six

from scapy.config import conf
from scapy.layers.tls.automaton_srv import TLSServerAutomaton

conf.verb = 4
conf.debug_tls = True  
conf.debug_dissector = 2
load_layer("tls")

@contextmanager
def captured_output():
    old_out, old_err = sys.stdout, sys.stderr
    new_out, new_err = StringWriter(debug=old_out), StringWriter(debug=old_out)
    try:
        sys.stdout, sys.stderr = new_out, new_err
        yield sys.stdout, sys.stderr
    finally:
        sys.stdout, sys.stderr = old_out, old_err

def check_output_for_data(out, err, expected_data):
    errored = err.s.strip()
    if errored:
        return (False, errored)
    output = out.s.strip()
    if expected_data:
        expected_data = plain_str(expected_data)
        print("Testing for output: '%s'" % expected_data)
        p = re.compile(r"> Received: b?'([^']*)'")
        for s in p.finditer(output):
            if s:
                data = s.group(1)
                print("Found: %s" % data)
                if expected_data in data:
                    return (True, data)
        return (False, output)
    else:
        return (False, None)


def run_tls_test_server(expected_data, q, curve=None, cookie=False, client_auth=False,
                        psk=None, handle_session_ticket=False):
    correct = False
    print("Server started !")
    with captured_output() as (out, err):
        # Prepare automaton
        mycert = scapy_path("/test/tls/pki/srv_cert.pem")
        mykey = scapy_path("/test/tls/pki/srv_key.pem")
        print(mykey)
        print(mycert)
        assert os.path.exists(mycert)
        assert os.path.exists(mykey)
        kwargs = dict()
        if psk:
            kwargs["psk"] = psk
            kwargs["psk_mode"] = "psk_dhe_ke"
        t = TLSServerAutomaton(mycert=mycert,
                               mykey=mykey,
                               curve=curve,
                               cookie=cookie,
                               client_auth=client_auth,
                               handle_session_ticket=handle_session_ticket,
                               debug=5,
                               **kwargs)
        # Sync threads
        q.put(True)
        # Run server automaton
        t.run()
        # Return correct answer
        res = check_output_for_data(out, err, expected_data)
    # Return data
    q.put(res)

def run_openssl_client(msg, suite="", version="", tls13=False, client_auth=False,
                       psk=None, sess_out=None):
    # Run client
    CA_f = scapy_path("/test/tls/pki/ca_cert.pem")
    mycert = scapy_path("/test/tls/pki/cli_cert.pem")
    mykey = scapy_path("/test/tls/pki/cli_key.pem")
    args = [
        "openssl", "s_client",
        "-connect", "127.0.0.1:4433", "-debug",
        "-ciphersuites" if tls13 else "-cipher", suite,
        version,
        "-CAfile", CA_f
    ]
    if client_auth:
        args.extend(["-cert", mycert, "-key", mykey])
    if psk:
        args.extend(["-psk", str(psk)])
    if sess_out:
        args.extend(["-sess_out", sess_out])
    p = subprocess.Popen(
        " ".join(args),
        shell=True,
        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
    )
    msg += b"\nstop_server\n"
    out = p.communicate(input=msg)[0]
    print(plain_str(out))
    if p.returncode != 0:
        raise RuntimeError("OpenSSL returned with error code %s" % p.returncode)
    else:
        p = re.compile(br'verify return:(\d+)')
        _failed = False
        _one_success = False
        for match in p.finditer(out):
            if match.group(1).strip() != b"1":
                _failed = True
                break
            else:
                _one_success = True
                break
        if _failed or not _one_success:
            raise RuntimeError("OpenSSL returned unexpected values")

def test_tls_server(suite="", version="", tls13=False, client_auth=False, psk=None):
    msg = ("TestS_%s_data" % suite).encode()
    # Run server
    q_ = Queue()
    th_ = threading.Thread(target=run_tls_test_server, args=(msg, q_),
                           kwargs={"curve": None, "cookie": False, "client_auth": client_auth, "psk": psk},
                           name="test_tls_server %s %s" % (suite, version))
    th_.setDaemon(True)
    th_.start()
    # Synchronise threads
    q_.get()
    time.sleep(1)
    # Run openssl client
    run_openssl_client(msg, suite=suite, version=version, tls13=tls13, client_auth=client_auth, psk=psk)
    # Wait for server
    th_.join(5)
    if th_.is_alive():
        raise RuntimeError("Test timed out")
    # Analyse values
    if q_.empty():
        raise RuntimeError("Missing return values")
    ret = q_.get(timeout=5)
    print(ret)
    assert ret[0]


= Testing TLS server with TLS 1.0 and TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA
~ open_ssl_client

test_tls_server("ECDHE-RSA-AES128-SHA", "-tls1")

= Testing TLS server with TLS 1.1 and TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA
~ open_ssl_client

test_tls_server("ECDHE-RSA-AES128-SHA", "-tls1_1")

= Testing TLS server with TLS 1.2 and TLS_DHE_RSA_WITH_AES_128_CBC_SHA256
~ open_ssl_client

test_tls_server("DHE-RSA-AES128-SHA256", "-tls1_2")

= Testing TLS server with TLS 1.2 and TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
~ open_ssl_client

test_tls_server("ECDHE-RSA-AES256-GCM-SHA384", "-tls1_2")

= Testing TLS server with TLS 1.3 and TLS_AES_256_GCM_SHA384
~ open_ssl_client

test_tls_server("TLS_AES_256_GCM_SHA384", "-tls1_3", tls13=True)

= Testing TLS server with TLS 1.3 and TLS_AES_256_GCM_SHA384 and client auth
~ open_ssl_client

test_tls_server("TLS_AES_256_GCM_SHA384", "-tls1_3", tls13=True, client_auth=True)

= Testing TLS server with TLS 1.3 and ECDHE-PSK-AES256-CBC-SHA384 and PSK
~ open_ssl_client

test_tls_server("ECDHE-PSK-AES256-CBC-SHA384", "-tls1_3", tls13=False, psk="1a2b3c4d")

+ TLS client automaton tests
~ client

= Load client utils functions

import sys, os, time, threading

from scapy.layers.tls.automaton_cli import TLSClientAutomaton
from scapy.layers.tls.handshake import TLSClientHello, TLS13ClientHello

from scapy.libs.six.moves.queue import Queue

send_data = cipher_suite_code = version = None

def run_tls_test_client(send_data=None, cipher_suite_code=None, version=None,
                        client_auth=False, key_update=False, stop_server=True,
                        session_ticket_file_out=None, session_ticket_file_in=None):
    print("Loading client...")
    mycert = scapy_path("/test/tls/pki/cli_cert.pem") if client_auth else None
    mykey = scapy_path("/test/tls/pki/cli_key.pem") if client_auth else None
    commands = [send_data]
    if key_update:
        commands.append(b"key_update")
    if stop_server:
        commands.append(b"stop_server")
    if session_ticket_file_out:
        commands.append(b"wait")
    commands.append(b"quit")
    if version == "0002":
        t = TLSClientAutomaton(data=commands, version="sslv2", debug=5, mycert=mycert, mykey=mykey,
                               session_ticket_file_in=session_ticket_file_in,
                               session_ticket_file_out=session_ticket_file_out)
    elif version == "0304":
        ch = TLS13ClientHello(ciphers=int(cipher_suite_code, 16))
        t = TLSClientAutomaton(client_hello=ch, data=commands, version="tls13", debug=5, mycert=mycert, mykey=mykey,
                               session_ticket_file_in=session_ticket_file_in,
                               session_ticket_file_out=session_ticket_file_out)
    else:
        ch = TLSClientHello(version=int(version, 16), ciphers=int(cipher_suite_code, 16))
        t = TLSClientAutomaton(client_hello=ch, data=commands, debug=5, mycert=mycert, mykey=mykey,
                               session_ticket_file_in=session_ticket_file_in,
                               session_ticket_file_out=session_ticket_file_out)
    print("Running client...")
    t.run()

def test_tls_client(suite, version, curve=None, cookie=False, client_auth=False,
                    key_update=False, sess_in_out=False):
    msg = ("TestC_%s_data" % suite).encode()
    # Run server
    q_ = Queue()
    print("Starting server...")
    th_ = threading.Thread(target=run_tls_test_server, args=(msg, q_),
                           kwargs={"curve": None, "cookie": False, "client_auth": client_auth,
                                   "handle_session_ticket": sess_in_out},
                           name="test_tls_client %s %s" % (suite, version))
    th_.setDaemon(True)
    th_.start()
    # Synchronise threads
    print("Syncrhonising...")
    assert q_.get(timeout=5) is True
    time.sleep(1)
    print("Thread synchronised")
    # Run client
    if sess_in_out:
        file_sess = scapy_path("/test/session")
        run_tls_test_client(msg, suite, version, client_auth, key_update, session_ticket_file_out=file_sess,
                            stop_server=False)
        run_tls_test_client(msg, suite, version, client_auth, key_update, session_ticket_file_in=file_sess,
                            stop_server=True)
    else:
        run_tls_test_client(msg, suite, version, client_auth, key_update)
    # Wait for server
    print("Client running, waiting...")
    th_.join(5)
    if th_.is_alive():
        raise RuntimeError("Test timed out")
    # Return values
    if q_.empty():
        raise RuntimeError("Missing return value")
    ret = q_.get(timeout=5)
    print(ret)
    assert ret[0]

= Testing TLS server and client with SSLv2 and SSL_CK_DES_192_EDE3_CBC_WITH_MD5

test_tls_client("0700c0", "0002")

= Testing TLS client with SSLv3 and TLS_RSA_EXPORT_WITH_RC4_40_MD5

test_tls_client("0003", "0300")

= Testing TLS client with TLS 1.0 and TLS_DHE_RSA_WITH_CAMELLIA_256_CBC_SHA

test_tls_client("0088", "0301")

= Testing TLS client with TLS 1.1 and TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA

test_tls_client("c013", "0302")

= Testing TLS client with TLS 1.2 and TLS_DHE_RSA_WITH_AES_128_GCM_SHA256

test_tls_client("009e", "0303")

= Testing TLS client with TLS 1.2 and TLS_ECDH_anon_WITH_RC4_128_SHA

test_tls_client("c016", "0303")

= Testing TLS server and client with TLS 1.3 and TLS_AES_128_GCM_SHA256

test_tls_client("1301", "0304")

= Testing TLS server and client with TLS 1.3 and TLS_CHACHA20_POLY1305_SHA256
~ crypto_advanced

test_tls_client("1303", "0304")

= Testing TLS server and client with TLS 1.3 and TLS_AES_128_CCM_8_SHA256
~ crypto_advanced

test_tls_client("1305", "0304")

= Testing TLS server and client with TLS 1.3 and TLS_AES_128_CCM_8_SHA256 and x448
~ crypto_advanced

test_tls_client("1305", "0304", curve="x448")

= Testing TLS server and client with TLS 1.3 and a retry
~ crypto_advanced

test_tls_client("1302", "0304", curve="secp256r1", cookie=True)

= Testing TLS server and client with TLS 1.3 and TLS_AES_128_CCM_8_SHA256 and client auth
~ crypto_advanced

test_tls_client("1305", "0304", client_auth=True)

= Testing TLS server and client with TLS 1.3 and TLS_AES_128_CCM_8_SHA256 and key update
~ crypto_advanced

test_tls_client("1305", "0304", key_update=True)

= Testing TLS server and client with TLS 1.3 and TLS_AES_128_CCM_8_SHA256 and session resumption
~ crypto_advanced not_pypy

test_tls_client("1305", "0304", client_auth=True, sess_in_out=True)

= Clear session file

file_sess = scapy_path("/test/session")
try:
    os.remove(file_sess)
except:
    pass

# Automaton as Socket tests

+ TLSAutomatonClient socket tests
~ netaccess

= Connect to google.com

load_layer("tls")
load_layer("http")

def _test_connection():
    a = TLSClientAutomaton.tlslink(HTTP, server="www.google.com", dport=443,
                                   server_name="www.google.com", debug=4)
    pkt = a.sr1(HTTP()/HTTPRequest(Host="www.google.com"),
                session=TCPSession(app=True), timeout=2, retry=3)
    a.close()
    assert pkt
    assert HTTPResponse in pkt
    assert b"</html>" in pkt[HTTPResponse].load

retry_test(_test_connection)