1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
#!/usr/bin/env python
from __future__ import print_function
"""
server3 from the book 'Network Security with OpenSSL', but modified to
Python/M2Crypto from the original C implementation.
Copyright (c) 2004-2005 Open Source Applications Foundation.
Author: Heikki Toivonen
"""
from M2Crypto import SSL, Rand, threading, DH
import thread
from socket import *
verbose_debug = 1
def verify_callback(ok, store):
if not ok:
print("***Verify Not ok")
return ok
dh1024 = None
def init_dhparams():
global dh1024
dh1024 = DH.load_params("dh1024.pem")
def tmp_dh_callback(ssl, is_export, keylength):
global dh1024
if not dh1024:
init_dhparams()
return dh1024._ptr()
def setup_server_ctx():
ctx = SSL.Context("sslv23")
if ctx.load_verify_locations("ca.pem") != 1:
print("***No CA file")
# if ctx.set_default_verify_paths() != 1:
# print("***No default verify paths")
ctx.load_cert_chain("server.pem")
ctx.set_verify(
SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 10, verify_callback
)
ctx.set_options(SSL.op_all | SSL.op_no_sslv2)
ctx.set_tmp_dh_callback(tmp_dh_callback)
# ctx.set_tmp_dh('dh1024.pem')
if ctx.set_cipher_list("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH") != 1:
print("***No valid ciphers")
if verbose_debug:
ctx.set_info_callback()
return ctx
def post_connection_check(peerX509, expectedHost):
if peerX509 is None:
print("***No peer certificate")
# Not sure if we can do any other checks
return 1
def do_server_loop(conn):
while 1:
try:
buf = conn.read()
if not buf:
break
print(buf)
except SSL.SSLError as what:
if str(what) == "unexpected eof":
break
else:
raise
except:
break
if conn.get_shutdown():
return 1
return 0
# How about something like:
# def server_thread(ctx, ssl, addr):
# conn = SSL.Connection(ctx, None)
# conn.ssl = ssl
# conn.setup_addr(addr)
def server_thread(ctx, sock, addr):
conn = SSL.Connection(ctx, sock)
conn.set_post_connection_check_callback(post_connection_check)
conn.setup_addr(addr)
conn.set_accept_state()
conn.setup_ssl()
conn.accept_ssl()
post_connection_check(conn)
print("SSL Connection opened")
if do_server_loop(conn):
conn.close()
else:
conn.clear()
print("SSL Connection closed")
if __name__ == "__main__":
threading.init()
Rand.load_file("../randpool.dat", -1)
ctx = setup_server_ctx()
# How about something like this?
# conn_root = SSL.Connection(ctx)
# conn_root.bind(('127.0.0.1', 9999))
# conn_root.listen(5)
# while 1:
# ssl, addr = conn_root.accept()
# thread.start_new_thread(server_thread, (ctx, ssl, addr))
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(("", 9999))
sock.listen(5)
while 1:
conn, addr = sock.accept()
thread.start_new_thread(server_thread, (ctx, conn, addr))
Rand.save_file("../randpool.dat")
threading.cleanup()
|