# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

# This script parses the LastDynamicAnalysis file generated by Valgrind running through CTest memcheck.
# It identifies any leaking file descriptors and triggers an error when detected.
# This enhances the capabilities of existing Valgrind checks.
# Output snippet for open file descriptors:
#  ==6652== FILE DESCRIPTORS: 6 open (3 std) at exit.
#  ==6652== Open AF_INET socket 6: 127.0.0.1:36915 <-> unbound
#  ==6652==    at 0x498B2EB: socket (syscall-template.S:120)
#  ==6652==    by 0x16CD16: s2n_new_inet_socket_pair (s2n_self_talk_ktls_test.c:69)
#  ==6652==    by 0x15DBB2: main (s2n_self_talk_ktls_test.c:168)
#  ==6652==
import os
import sys

EXIT_SUCCESS = 0
# Exit with error code 1 if leaking fds are detected.
ERROR_EXIT_CODE = 1
# This test is designed to be informational only, so we only print fifteen lines of error messages when a leak is detected.
NUM_OF_LINES_TO_PRINT = 15


def find_log_file(path):
    for f in os.listdir(path):
        if "LastDynamicAnalysis" in f:
            return os.path.join(path, f)

    raise FileNotFoundError("LastDynamicAnalysis log file is not found!")


def detect_leak(file):
    fd_leak_detected = False
    lines = file.readlines()
    for i in range(len(lines)):
        if "FILE DESCRIPTORS:" in lines[i]:
            # Example line: `==6096== FILE DESCRIPTORS: 4 open (3 std) at exit.`
            line_elements = lines[i].split()
            open_fd_count = line_elements[line_elements.index("DESCRIPTORS:") + 1]
            std_fd_count = line_elements[line_elements.index("std)") - 1][1:]
            # CTest memcheck writes to a LastDynamicAnslysis log file.
            # We allow that fd to remain opened.
            if int(open_fd_count) > int(std_fd_count) + 1:
                for j in range(NUM_OF_LINES_TO_PRINT):
                    print(lines[i + j], end="")
                print()
                fd_leak_detected = True
    return fd_leak_detected


def main():
    # Print banner of the test
    print("############################################################################")
    print("################# Test for Leaking File Descriptors ########################")
    print("############################################################################")

    with open(find_log_file(sys.argv[1]), 'r') as file:
        if detect_leak(file):
            sys.exit(ERROR_EXIT_CODE)

    return EXIT_SUCCESS


if __name__ == '__main__':
    main()
