1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
#!/usr/bin/python3
"""Windows: Implementation of a HTTP server"""
import argparse
import os
import subprocess
import sys
import threading
from urllib.parse import urlparse
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from make_certificates import CertificateMaker
RESULT_PATH = os.getcwd()
FILES_PATH = os.path.join(RESULT_PATH, "./Testing/files/")
CERTS_PATH = os.path.join(RESULT_PATH, "./Testing/certs/")
CONF_PATH = os.path.join(RESULT_PATH, "./Testing/conf/")
LOGS_PATH = os.path.join(RESULT_PATH, "./Testing/logs/")
REQUEST = os.path.join(FILES_PATH, "./jreq.tsq")
RESPONS = os.path.join(FILES_PATH, "./jresp.tsr")
OPENSSL_CONF = os.path.join(CONF_PATH, "./openssl_tsa.cnf")
SERVER_LOG = os.path.join(LOGS_PATH, "./server.log")
URL_LOG = os.path.join(LOGS_PATH, "./url.log")
OPENSSL_TS = ["openssl", "ts",
"-reply", "-config", OPENSSL_CONF,
"-passin", "pass:passme",
"-queryfile", REQUEST,
"-out", RESPONS]
class RequestHandler(SimpleHTTPRequestHandler):
"""Handle the HTTP POST request that arrive at the server"""
def __init__(self, request, client_address, server):
# Save the server handle
self.server = server
SimpleHTTPRequestHandler.__init__(self, request, client_address, server)
def do_GET(self): # pylint: disable=invalid-name
""""Serves the GET request type"""
try:
url = urlparse(self.path)
self.send_response(200)
self.send_header("Content-type", "application/pkix-crl")
self.end_headers()
resp_data = b''
# Read the file and send the contents
if url.path == "/intermediateCA":
file_path = os.path.join(CERTS_PATH, "./CACertCRL.der")
with open(file_path, 'rb') as file:
resp_data = file.read()
if url.path == "/TSACA":
file_path = os.path.join(CERTS_PATH, "./TSACertCRL.der")
with open(file_path, 'rb') as file:
resp_data = file.read()
self.wfile.write(resp_data)
except Exception as err: # pylint: disable=broad-except
print("HTTP GET request error: {}".format(err))
def do_POST(self): # pylint: disable=invalid-name
""""Serves the POST request type"""
try:
url = urlparse(self.path)
self.send_response(200)
if url.path == "/kill_server":
self.log_message(f"Deleting file: {URL_LOG}")
os.remove(f"{URL_LOG}")
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes('Shutting down HTTP server', 'utf-8'))
self.server.shutdown()
else:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
with open(REQUEST, mode="wb") as file:
file.write(post_data)
openssl = subprocess.run(OPENSSL_TS,
check=True, universal_newlines=True)
openssl.check_returncode()
self.send_header("Content-type", "application/timestamp-reply")
self.end_headers()
resp_data = b''
with open(RESPONS, mode="rb") as file:
resp_data = file.read()
self.wfile.write(resp_data)
except Exception as err: # pylint: disable=broad-except
print("HTTP POST request error: {}".format(err))
class HttpServerThread():
"""TSA server thread handler"""
# pylint: disable=too-few-public-methods
def __init__(self):
self.server = None
self.server_thread = None
def start_server(self, port) -> (int):
"""Starting HTTP server on 127.0.0.1 and a random available port for binding"""
self.server = ThreadingHTTPServer(('127.0.0.1', port), RequestHandler)
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.start()
hostname, port = self.server.server_address[:2]
print("HTTP server started, URL http://{}:{}".format(hostname, port))
return port
def main() -> None:
"""Start HTTP server"""
ret = 0
parser = argparse.ArgumentParser()
parser.add_argument(
"--port",
type=int,
default=0,
help="port number"
)
args = parser.parse_args()
try:
sys.stdout = open(SERVER_LOG, "w")
sys.stderr = open(SERVER_LOG, "a")
server = HttpServerThread()
port = server.start_server(args.port)
with open(URL_LOG, mode="w") as file:
file.write("127.0.0.1:{}".format(port))
tests = CertificateMaker(port, SERVER_LOG)
tests.make_certs()
except OSError as err:
print("OSError: {}".format(err))
ret = err.errno
finally:
sys.exit(ret)
if __name__ == '__main__':
main()
# pylint: disable=pointless-string-statement
"""Local Variables:
c-basic-offset: 4
tab-width: 4
indent-tabs-mode: nil
End:
vim: set ts=4 expandtab:
"""
|