#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-2.1-only
#
# Test to replace idle_thread of scope unit using cgexec -r
#
# Copyright (c) 2023 Oracle and/or its affiliates.
# Author: Kamalesh Babulal <kamalesh.babulal@oracle.com>
#

from multiprocessing import active_children
from process import Process
from systemd import Systemd
from cgroup import Cgroup
from run import RunError
import consts
import ftests
import time
import sys
import os

CONTROLLER = 'cpu'
SLICE = 'libcgroup.slice'
CGNAME = os.path.join(SLICE, '085cgcreate.scope')
OUT_OF_SCOPE_CGNAME1 = '085outofscope'
OUT_OF_SCOPE_CGNAME2 = '085outofscope.scope'


def prereqs(config):
    result = consts.TEST_PASSED
    cause = None

    if config.args.container:
        result = consts.TEST_SKIPPED
        cause = 'This test cannot be run within a container'

    if not Systemd.is_systemd_enabled():
        result = consts.TEST_SKIPPED
        cause = 'Systemd support not compiled in'

    return result, cause


def setup(config):
    pass


def test(config):
    result = consts.TEST_PASSED
    cause = None

    Cgroup.create_and_validate(config, CONTROLLER, CGNAME, create_scope=True)

    #
    # Test 1: call cgexec -r to replace idle_thread with infinite while loop
    #
    idle_pid = Cgroup.get_pids_in_cgroup(config, CGNAME, CONTROLLER)[0]
    config.process.create_process_in_cgroup(config, CONTROLLER, CGNAME, cgclassify=False,
                                            replace_idle=True)

    # We need pause, before the cgroups.procs gets updated, post cgexec
    time.sleep(1)

    pid = Cgroup.get_pids_in_cgroup(config, CGNAME, CONTROLLER)[0]
    if idle_pid == pid:
        result = consts.TEST_FAILED
        cause = 'Failed to replace scope idle_thread pid {} {}'.format(idle_pid, pid)
        return result, cause

    #
    # Test 2: call cgexec -r to replace non-idle_thread (while loop,
    #         previously created) with another while loop. It's expected to fail,
    #         because only idle_thread is allowed to be replaced.
    #
    config.process.create_process_in_cgroup(config, CONTROLLER, CGNAME,
                                            cgclassify=False, replace_idle=True)

    # We need pause, before the cgroups.procs gets updated, post cgexec
    time.sleep(1)

    replace_pid = Cgroup.get_pids_in_cgroup(config, CGNAME, CONTROLLER)[0]
    if replace_pid != pid:
        result = consts.TEST_FAILED
        cause = ('Erroneously replaced scope non idle_thread pid {} with '
                 'pid {}'.format(pid, replace_pid))
        return result, cause

    #
    # Test 3: create a non-scope cgroup, try creating a task with replace_idle
    #         set. It should fail way before creating a task, during cgroup name
    #         check.
    Cgroup.create_and_validate(config, CONTROLLER, OUT_OF_SCOPE_CGNAME1)
    config.process.create_process_in_cgroup(config, CONTROLLER, OUT_OF_SCOPE_CGNAME1,
                                            cgclassify=False, replace_idle=True)

    # We need pause, before the cgroups.procs gets updated, post cgexec
    time.sleep(1)

    pid = Cgroup.get_pids_in_cgroup(config, OUT_OF_SCOPE_CGNAME1, CONTROLLER)
    if len(pid) != 0:
        result = consts.TEST_FAILED
        cause = ('Erroneously succeeded in creating task in non scope cgroup: '
                 'pid {}'.format(pid))
        return result, cause

    #
    # Test 4: create a non-scope cgroup with .scope suffix, try creating a task with
    #         replace_idle set. It should fail when trying to find idle_task
    #         check.
    #
    Cgroup.create_and_validate(config, CONTROLLER, OUT_OF_SCOPE_CGNAME2)
    config.process.create_process_in_cgroup(config, CONTROLLER, OUT_OF_SCOPE_CGNAME2,
                                            cgclassify=False, replace_idle=True)

    # We need pause, before the cgroups.procs gets updated, post cgexec
    time.sleep(1)

    pid = Cgroup.get_pids_in_cgroup(config, OUT_OF_SCOPE_CGNAME2, CONTROLLER)
    if len(pid) != 0:
        result = consts.TEST_FAILED
        cause = ('Erroneously succeeded in creating task in non scope cgroup: '
                 'pid {}'.format(pid))

    return result, cause


def teardown(config):
    Cgroup.delete(config, CONTROLLER, OUT_OF_SCOPE_CGNAME1)
    Cgroup.delete(config, CONTROLLER, OUT_OF_SCOPE_CGNAME2)

    pid = Cgroup.get_pids_in_cgroup(config, CGNAME, CONTROLLER)
    Process.kill(config, pid)

    # kill the process, that was created by us but failed to migrate to the cgroup
    active = active_children()
    active[0].terminate()

    # systemd will automatically remove the cgroup once there are no more pids in
    # the cgroup, so we don't need to delete CGNAME.  But let's try to remove the
    # slice
    try:
        Cgroup.delete(config, CONTROLLER, SLICE)
    except RunError:
        pass


def main(config):
    [result, cause] = prereqs(config)
    if result != consts.TEST_PASSED:
        return [result, cause]

    setup(config)

    [result, cause] = test(config)
    teardown(config)

    return [result, cause]


if __name__ == '__main__':
    config = ftests.parse_args()
    # this test was invoked directly.  run only it
    config.args.num = int(os.path.basename(__file__).split('-')[0])
    sys.exit(ftests.main(config))

# vim: set et ts=4 sw=4:
