1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
|
# Copyright (c) Meta Platforms, Inc. and affiliates.
# SPDX-License-Identifier: LGPL-2.1-or-later
import os
import signal
from drgn.helpers.linux.cpumask import for_each_possible_cpu
from drgn.helpers.linux.pid import find_task
from drgn.helpers.linux.sched import (
cpu_curr,
idle_task,
loadavg,
task_cpu,
task_state_to_char,
task_thread_info,
)
from tests.linux_kernel import (
LinuxKernelTestCase,
fork_and_stop,
proc_state,
skip_unless_have_test_kmod,
smp_enabled,
wait_until,
)
class TestSched(LinuxKernelTestCase):
@skip_unless_have_test_kmod
def test_task_thread_info(self):
self.assertEqual(
task_thread_info(self.prog["drgn_test_kthread"]),
self.prog["drgn_test_kthread_info"],
)
def test_task_cpu(self):
cpu = os.cpu_count() - 1
with fork_and_stop(os.sched_setaffinity, 0, (cpu,)) as (pid, _):
self.assertEqual(task_cpu(find_task(self.prog, pid)), cpu)
def test_task_state_to_char(self):
task = find_task(self.prog, os.getpid())
self.assertEqual(task_state_to_char(task), "R")
pid = os.fork()
try:
if pid == 0:
try:
while True:
signal.pause()
finally:
os._exit(1)
task = find_task(self.prog, pid)
wait_until(lambda: proc_state(pid) == "S")
self.assertEqual(task_state_to_char(task), "S")
os.kill(pid, signal.SIGSTOP)
wait_until(lambda: proc_state(pid) == "T")
self.assertEqual(task_state_to_char(task), "T")
os.kill(pid, signal.SIGKILL)
wait_until(lambda: proc_state(pid) == "Z")
self.assertEqual(task_state_to_char(task), "Z")
finally:
os.kill(pid, signal.SIGKILL)
os.waitpid(pid, 0)
def test_cpu_curr(self):
task = find_task(self.prog, os.getpid())
cpu = os.cpu_count() - 1
old_affinity = os.sched_getaffinity(0)
os.sched_setaffinity(0, (cpu,))
try:
self.assertEqual(cpu_curr(self.prog, cpu), task)
finally:
os.sched_setaffinity(0, old_affinity)
def test_idle_task(self):
if smp_enabled():
for cpu in for_each_possible_cpu(self.prog):
self.assertEqual(
idle_task(self.prog, cpu).comm.string_(), f"swapper/{cpu}".encode()
)
else:
self.assertEqual(idle_task(self.prog, 0).comm.string_(), b"swapper")
def test_loadavg(self):
values = loadavg(self.prog)
self.assertEqual(len(values), 3)
self.assertTrue(all(v >= 0.0 for v in values))
|