#!/usr/bin/env python3
#
# SPDX-FileCopyrightText: 2024 Kienan Stewart <kstewart@efficios.com>
# SPDX-License-Identifier: GPL-2.0-only
#

"""
Tests the behaviour of the configuration and socket paths for lttng-sessiond, lttng-relayd, the consumer daemons, and liblttng-ctl.
"""

import copy
import os
import os.path
import pathlib
import platform
import pwd
import re
import socket
import stat
import subprocess
import sys
import tempfile
import time

# Import in-tree test utils
test_utils_import_path = pathlib.Path(__file__).absolute().parents[3] / "utils"
sys.path.append(str(test_utils_import_path))

import lttngtest
import bt2

user_info = pwd.getpwuid(os.getuid())
user_is_root = user_info.pw_uid == 0
user_home = os.path.expanduser("~")

DEFAULT_SUBSTITUTIONS = {
    "USER_HOME": user_home,
    "USER_ID": user_info.pw_uid,
    "SYSTEM_HOME": "@LTTNG_SYSTEM_RUNDIR@",
    "LTTNG_HOME": "@LTTNG_SYSTEM_RUNDIR@" if user_is_root else "{USER_HOME}",
    "RUNDIR": "@LTTNG_SYSTEM_RUNDIR@" if user_is_root else "{LTTNG_HOME}/.lttng",
    # If the values from the lttng-ust headers change, these will need to be updated.
    "LTTNG_UST_SOCKET_NAME": "lttng-ust-sock-8",
    "LTTNG_UST_WAIT_FILENAME": "lttng-ust-wait-8",
}

DEFAULT_EXPECTED_PATHS = {
    "rundir": {
        "type": "directory",
        "path": "{RUNDIR}",
    },
    "apps_unix_sock_path": {
        "type": "socket",
        "path": "{RUNDIR}/{LTTNG_UST_SOCKET_NAME}",
    },
    "wait_shm_path": {
        "type": "shm",
        "path": (
            "/{LTTNG_UST_WAIT_FILENAME}"
            if user_is_root
            else "/{LTTNG_UST_WAIT_FILENAME}-{USER_ID}"
        ),
    },
    "client_unix_sock_path": {
        "type": "socket",
        "path": "{RUNDIR}/client-lttng-sessiond",
    },
    "sessiond_health_unix_sock_path": {
        "type": "socket",
        "path": "{RUNDIR}/sessiond-health",
    },
    "sessiond_notification_unix_sock_path": {
        "type": "socket",
        "path": "{RUNDIR}/sessiond-notification",
    },
    "relayd_health_unix_sock_path": {
        "type": "socket",
        "path": "{RUNDIR}/relayd/health-{RELAYD_PID}",
    },
}

if platform.architecture()[0] == "64bit":
    DEFAULT_EXPECTED_PATHS["ustconsumerd64_health_unix_sock_path"] = {
        "type": "socket",
        "path": "{RUNDIR}/ustconsumerd64/health",
    }
else:
    DEFAULT_EXPECTED_PATHS["ustconsumerd32_healh_unix_sock_path"] = {
        "type": "socket",
        "path": "{RUNDIR}/ustconsumerd32/health",
    }

if lttngtest._Environment.run_kernel_tests():
    DEFAULT_EXPECTED_PATHS["kconsumerd_health_unix_sock_path"] = {
        "type": "socket",
        "path": "{RUNDIR}/kconsumerd/health",
    }

# These are used for compatibility with python < 3.9. If the python
# version is 3.9+, `dict_a | dict_b` can be used instead.
_ust_ctl_path_expected_paths = copy.deepcopy(DEFAULT_EXPECTED_PATHS)
_ust_ctl_path_expected_paths.update(
    {
        "apps_unix_sock_path": {
            "type": "socket",
            "path": "{LTTNG_UST_CTL_PATH}/{LTTNG_UST_SOCKET_NAME}",
        },
    }
)

tests = {
    "default": {
        "description": "LTTng with no specific environment settings",
        "environment_variables": {},
        "substitutions": DEFAULT_SUBSTITUTIONS,
        "expected_paths": DEFAULT_EXPECTED_PATHS,
    },
    "lttng_home": {
        "description": "LTTng with LTTNG_HOME set",
        "environment_variables": {
            # This value will come from the lttngtest environment
            "LTTNG_HOME": True,
        },
        "substitutions": DEFAULT_SUBSTITUTIONS,
        "expected_paths": DEFAULT_EXPECTED_PATHS,
    },
    "ust_ctl_path": {
        "description": "LTTng with LTTNG_UST_CTL_PATH set",
        "environment_variables": {
            # This value will be generated in the lttngtest environment
            "LTTNG_UST_CTL_PATH": True,
        },
        "substitutions": DEFAULT_SUBSTITUTIONS,
        "expected_paths": _ust_ctl_path_expected_paths,
    },
    "lttng_home_and_ust_ctl_path": {
        "description": "LTTng with LTTNG_HOME and LTTNG_UST_CTL_PATH set",
        "environment_variables": {
            "LTTNG_HOME": True,
            "LTTNG_UST_CTL_PATH": True,
        },
        "substitutions": DEFAULT_SUBSTITUTIONS,
        "expected_paths": _ust_ctl_path_expected_paths,
    },
    "lttng_rundir": {
        "description": "LTTng with LTTNG_RUNDIR set",
        "environment_variables": {
            "LTTNG_RUNDIR": True,
        },
        "substitutions": DEFAULT_SUBSTITUTIONS,
        "expected_paths": DEFAULT_EXPECTED_PATHS,
    },
    "lttng_home_and_rundir": {
        "description": "LTTng with LTTNG_RUNDIR and LTTNG_HOME set",
        "environment_variables": {
            "LTTNG_RUNDIR": True,
            "LTTNG_HOME": True,
        },
        "substitutions": DEFAULT_SUBSTITUTIONS,
        "expected_paths": DEFAULT_EXPECTED_PATHS,
    },
    "lttng_rundir_and_ust_ctl_path": {
        "description": "LTTng with LTTNG_RUNDIR and LTTNG_UST_CTL_PATH set",
        "environment_variables": {
            "LTTNG_RUNDIR": True,
            "LTTNG_UST_CTL_PATH": True,
        },
        "substitutions": DEFAULT_SUBSTITUTIONS,
        "expected_paths": _ust_ctl_path_expected_paths,
    },
    "lttng_home_and_rundir_and_ust_ctl_path": {
        "description": "LTTng with LTTNG_RUNDIR, LTTNG_HOME, and LTTNG_UST_CTL_PATH set",
        "environment_variables": {
            "LTTNG_RUNDIR": True,
            "LTTNG_HOME": True,
            "LTTNG_UST_CTL_PATH": True,
        },
        "substitutions": DEFAULT_SUBSTITUTIONS,
        "expected_paths": _ust_ctl_path_expected_paths,
    },
}


class MissingDict(dict):
    def __missing__(self, key):
        return key


def test_config_and_socket_paths(
    test_env,
    tap,
    test_identifier,
    description,
    environment_variables,
    substitutions,
    expected_paths,
):
    tap.diagnostic(
        "[{}] Config and socket paths - {}".format(test_identifier, description)
    )

    if (
        "LTTNG_HOME" in environment_variables
        and test_env.lttng_home_location is not None
    ):
        environment_variables["LTTNG_HOME"] = str(test_env.lttng_home_location)
        substitutions.update(environment_variables)

    substitutions["RELAYD_PID"] = test_env._relayd.pid

    client = lttngtest.LTTngClient(
        test_env, log=tap.diagnostic, extra_env_vars=environment_variables
    )
    session_output_location = lttngtest.LocalSessionOutputLocation(
        test_env.create_temporary_directory("trace")
    )
    session = client.create_session(output=session_output_location)
    channel = session.add_channel(lttngtest.lttngctl.TracingDomain.User)
    channel.add_recording_rule(lttngtest.lttngctl.UserTracepointEventRule("tp:tptest"))

    kernel_channel = None
    if test_env.run_kernel_tests():
        kernel_channel = session.add_channel(lttngtest.lttngctl.TracingDomain.Kernel)
        kernel_channel.add_recording_rule(
            lttngtest.lttngctl.KernelTracepointEventRule("*")
        )

    session.start()
    test_app = test_env.launch_wait_trace_test_application(100)
    test_app.trace()
    test_app.wait_for_tracing_done()
    test_app.wait_for_exit()

    errors = []
    if "LTTNG_RUNDIR" in environment_variables:
        os.system("find {}".format(environment_variables["LTTNG_RUNDIR"]))
    for path_id, path_conf in expected_paths.items():
        path = path_conf["path"]
        while re.search("\{\w+\}", path) is not None:
            path = path.format_map(MissingDict(substitutions))
        tap.diagnostic(
            "Checking for file type `{}` at `{}`".format(path_conf["type"], path)
        )
        if path_conf["type"] == "file":
            if not os.path.isfile(path):
                tap.diagnostic("`{}` is not a file".format(path))
                errors.append(path_id)
        elif path_conf["type"] == "socket":
            try:
                if not stat.S_ISSOCK(os.stat(path).st_mode):
                    tap.diagnostic("`{}` is not a socket".format(path))
                    errors.append(path_id)
            except Exception as e:
                tap.diagnostic(
                    "Exception while checking socket `{}`: {}".format(path, str(e))
                )
                errors.append(path_id)
        elif path_conf["type"] == "directory":
            if not os.path.isdir(path):
                tap.diagnostic("`{}` is not a directory".format(path))
                errors.append(path_id)
        elif path_conf["type"] == "shm":
            # @FIXME: Portability on Windows and MacOSX
            if platform.system() != "Linux":
                tap.diagnostic("Skipping check of shm at `{}`".format(path))
                continue
            path = "/dev/shm/{}".format(path)
            if not os.path.isfile(path):
                tap.diagnostic("Shared memory at `{}` is not a file".format(path))
                errors.append(path_id)
        else:
            tap.diagnostic(
                "Unknown type `{}` for path `{}`".format(path_conf["type"], path)
            )
            errors.append(path_id)
    tap.test(
        len(errors) == 0,
        "{} expected configuration paths exist. {} errors".format(
            len(expected_paths), len(errors)
        ),
    )

    session.stop()
    session.destroy()


if __name__ == "__main__":
    tap = lttngtest.TapGenerator(len(tests))

    for test_identifier, test_configuration in tests.items():
        temp_conf = copy.deepcopy(test_configuration)
        skip_lttng_home = (
            "LTTNG_HOME" not in test_configuration["environment_variables"]
        )
        if skip_lttng_home and os.getenv("LTTNG_HOME", None) is not None:
            tap.skip(
                "LTTNG_HOME is already set, skipping test that should exercise the default value"
            )
            continue

        # This will hold keep the temporary object file alive until the next
        # iteration. When the object is destroy, the tempfile will be cleaned
        # up.
        tempfiles = []
        if "LTTNG_UST_CTL_PATH" in test_configuration["environment_variables"]:
            tempfiles.append(tempfile.TemporaryDirectory())
            temp_conf["environment_variables"]["LTTNG_UST_CTL_PATH"] = tempfiles[
                -1
            ].name
            temp_conf["substitutions"]["LTTNG_UST_CTL_PATH"] = tempfiles[-1].name
        if "LTTNG_RUNDIR" in test_configuration["environment_variables"]:
            tempfiles.append(tempfile.TemporaryDirectory())
            temp_conf["environment_variables"]["LTTNG_RUNDIR"] = tempfiles[-1].name
            temp_conf["substitutions"]["RUNDIR"] = tempfiles[-1].name
        with lttngtest.test_environment(
            with_sessiond=True,
            log=tap.diagnostic,
            with_relayd=True,
            extra_env_vars=temp_conf["environment_variables"],
            skip_temporary_lttng_home=skip_lttng_home,
            enable_kernel_domain=lttngtest._Environment.run_kernel_tests(),
        ) as test_env:
            test_config_and_socket_paths(test_env, tap, test_identifier, **temp_conf)
    sys.exit(0 if tap.is_successful else 1)
