File: test_mm.py

package info (click to toggle)
drgn 0.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 7,852 kB
  • sloc: python: 74,992; ansic: 54,589; awk: 423; makefile: 351; sh: 99
file content (285 lines) | stat: -rw-r--r-- 11,377 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# Copyright (c) Meta Platforms, Inc. and affiliates.
# Copyright (c) 2025, Kylin Software, Inc. and affiliates.
# SPDX-License-Identifier: LGPL-2.1-or-later

import mmap
import os
import re

from drgn import Object
from drgn.helpers.linux.cpumask import for_each_online_cpu
from drgn.helpers.linux.mm import PFN_PHYS, TaskRss, phys_to_virt
from drgn.helpers.linux.percpu import per_cpu_ptr
from tests.linux_kernel import (
    skip_if_highmem,
    skip_unless_have_full_mm_support,
    skip_unless_have_test_disk,
    skip_unless_have_test_kmod,
)
from tests.linux_kernel.crash_commands import CrashCommandTestCase
from tests.linux_kernel.helpers.test_mm import MmTestCase
from tests.linux_kernel.helpers.test_swap import tmp_swaps


class TestBtop(CrashCommandTestCase):
    def test_single(self):
        addr = mmap.PAGESIZE * 2
        cmd = self.check_crash_command(f"btop {addr:x}")
        self.assertEqual(cmd.stdout, f"{addr:x}: 2\n")
        self.assertEqual(cmd.drgn_option.globals["phys_addr"], addr)
        self.assertEqual(cmd.drgn_option.globals["pfn"], 2)

    def test_multiple(self):
        addr1 = mmap.PAGESIZE * 2
        addr2 = mmap.PAGESIZE * 10
        cmd = self.check_crash_command(f"btop {addr1:x} {addr2:x}")
        self.assertEqual(cmd.stdout, f"{addr1:x}: 2\n{addr2:x}: a\n")
        self.assertRegex(cmd.drgn_option.stdout, r"\bfor\b.*\bin\b")
        self.assertEqual(cmd.drgn_option.globals["phys_addr"], addr2)
        self.assertEqual(cmd.drgn_option.globals["pfn"], 10)


class TestPtob(CrashCommandTestCase):
    def test_single(self):
        addr = mmap.PAGESIZE * 2
        cmd = self.check_crash_command("ptob 2")
        self.assertEqual(cmd.stdout, f"2: {addr:x}\n")
        self.assertEqual(cmd.drgn_option.globals["pfn"], 2)
        self.assertEqual(cmd.drgn_option.globals["phys_addr"], addr)

    def test_multiple(self):
        addr1 = mmap.PAGESIZE * 2
        addr2 = mmap.PAGESIZE * 10
        cmd = self.check_crash_command("ptob 2 10")
        self.assertEqual(cmd.stdout, f"2: {addr1:x}\na: {addr2:x}\n")
        self.assertRegex(cmd.drgn_option.stdout, r"\bfor\b.*\bin\b")
        self.assertEqual(cmd.drgn_option.globals["pfn"], 10)
        self.assertEqual(cmd.drgn_option.globals["phys_addr"], addr2)


class TestPtov(CrashCommandTestCase):
    @skip_unless_have_full_mm_support
    def test_phy_to_virt(self):
        """Test physical address to virtual address conversion."""
        phys_addr = 0x123
        virt_addr = phys_to_virt(self.prog, phys_addr)
        virt_addr_int = virt_addr.value_()

        cmd = self.check_crash_command(f"ptov {hex(phys_addr)}")
        self.assertRegex(cmd.stdout, r"(?m)^\s*VIRTUAL\s+PHYSICAL")
        self.assertRegex(cmd.stdout, rf"(?m)^\s*{virt_addr_int:016x}\s+{phys_addr:x}")

    def test_per_cpu_offset_single_cpu(self):
        """Test per-CPU offset conversion for a single CPU."""
        offset = 0x100
        cpu = 0
        ptr = Object(self.prog, "unsigned long", offset)
        virt_ptr = per_cpu_ptr(ptr, cpu)
        virt_int = virt_ptr.value_()

        cmd = self.check_crash_command(f"ptov {hex(offset)}:{cpu}")
        self.assertRegex(cmd.stdout, rf"(?m)^\s*PER-CPU OFFSET:\s+{offset:x}")
        self.assertRegex(cmd.stdout, r"(?m)^\s*CPU\s+VIRTUAL")
        self.assertRegex(cmd.stdout, rf"(?m)^\s*\[{cpu}\]\s+{virt_int:016x}")

    def test_per_cpu_offset_all_cpus(self):
        """Test per-CPU offset conversion for all CPUs."""
        offset = 0x200
        cmd = self.check_crash_command(f"ptov {hex(offset)}:a")

        self.assertRegex(cmd.stdout, rf"(?m)^\s*PER-CPU OFFSET:\s+{offset:x}")
        self.assertRegex(cmd.stdout, r"(?m)^\s*CPU\s+VIRTUAL")

        ptr = Object(self.prog, "unsigned long", offset)
        for cpu in for_each_online_cpu(self.prog):
            virt = per_cpu_ptr(ptr, cpu)
            self.assertRegex(cmd.stdout, rf"(?m)^\s*\[{cpu}\]\s+{virt.value_():016x}")

    def test_per_cpu_offset_cpu_list(self):
        """Test per-CPU offset conversion for a CPU list."""
        offset = 0x300
        cpus = sorted(os.sched_getaffinity(0))
        cmd = self.check_crash_command(f"ptov {hex(offset)}:{','.join(map(str, cpus))}")

        self.assertRegex(cmd.stdout, rf"(?m)^\s*PER-CPU OFFSET:\s+{offset:x}")
        self.assertRegex(cmd.stdout, r"(?m)^\s*CPU\s+VIRTUAL")

        ptr = Object(self.prog, "unsigned long", offset)
        for cpu in cpus:
            virt = per_cpu_ptr(ptr, cpu)
            self.assertRegex(cmd.stdout, rf"(?m)^\s*\[{cpu}\]\s+{virt.value_():016x}")

    def test_invalid_address(self):
        """Test invalid physical address input."""
        with self.assertRaises(Exception) as cm:
            self.check_crash_command("ptov invalid_address")
        msg = str(cm.exception).lower()
        self.assertTrue(
            "invalid literal" in msg or "base 16" in msg,
            f"Unexpected error message: {msg}",
        )

    def test_invalid_cpu_spec(self):
        """Test invalid per-CPU specifier."""
        offset = 0x400
        with self.assertRaises(Exception) as cm:
            self.check_crash_command(f"ptov {hex(offset)}:invalid")
        msg = str(cm.exception).lower()
        self.assertIn("invalid cpuspec", msg, f"Unexpected error message: {msg}")


@skip_unless_have_full_mm_support
class TestVtop(CrashCommandTestCase, MmTestCase):
    @skip_unless_have_test_kmod
    def test_kernel(self):
        virt_addr = self.prog["drgn_test_va"].value_()
        phys_addr = self.prog["drgn_test_pa"].value_()

        cmd = self.check_crash_command(f"vtop -k {virt_addr:x}")
        self.assertIn(f"{phys_addr:x}", cmd.stdout)

        self.assertIn("follow_phys", cmd.drgn_option.stdout)
        self.assertEqual(cmd.drgn_option.globals["virtual_address"], virt_addr)
        self.assertEqual(cmd.drgn_option.globals["physical_address"], phys_addr)

    @skip_unless_have_test_kmod
    def test_kernel_multiple(self):
        virt_addrs = [
            self.prog["drgn_test_va"].value_(),
            self.prog["drgn_test_vmalloc_va"].value_(),
        ]
        phys_addrs = [
            self.prog["drgn_test_pa"].value_(),
            self.prog["drgn_test_vmalloc_pa"].value_(),
        ]

        cmd = self.check_crash_command(f"vtop -k {virt_addrs[0]:x} {virt_addrs[1]:x}")
        for phys_addr in phys_addrs:
            self.assertIn(f"{phys_addr:x}", cmd.stdout)

        self.assertIn("follow_phys", cmd.drgn_option.stdout)
        for virt_addr in virt_addrs:
            self.assertIn(hex(virt_addr), cmd.drgn_option.stdout)
        self.assertIn(cmd.drgn_option.globals["virtual_address"], virt_addrs)
        self.assertIn(cmd.drgn_option.globals["physical_address"], phys_addrs)

    @skip_if_highmem
    def test_user(self):
        with self.map_pages() as (_, address, pfns):
            phys_addr = PFN_PHYS(self.prog, pfns[0]).value_()
            cmd = self.check_crash_command(f"vtop -u -c {os.getpid()} {address:x}")
        self.assertIn(f"{phys_addr:x}", cmd.stdout)

        self.assertIn("follow_phys", cmd.drgn_option.stdout)
        self.assertEqual(cmd.drgn_option.globals["virtual_address"], address)
        self.assertEqual(cmd.drgn_option.globals["physical_address"], phys_addr)

    @skip_if_highmem
    def test_user_multiple(self):
        with self.map_pages() as (_, address, pfns):
            virt_addrs = [address, address + mmap.PAGESIZE]
            phys_addrs = [PFN_PHYS(self.prog, pfns[i]).value_() for i in range(2)]
            cmd = self.check_crash_command(
                f"vtop -u -c {os.getpid()} {virt_addrs[0]:x} {virt_addrs[1]:x}"
            )
        for phys_addr in phys_addrs:
            self.assertIn(f"{phys_addr:x}", cmd.stdout)

        self.assertIn("follow_phys", cmd.drgn_option.stdout)
        for virt_addr in virt_addrs:
            self.assertIn(hex(virt_addr), cmd.drgn_option.stdout)
        self.assertIn(cmd.drgn_option.globals["virtual_address"], virt_addrs)
        self.assertIn(cmd.drgn_option.globals["physical_address"], phys_addrs)

    @skip_unless_have_test_kmod
    def test_guess(self):
        virt_addr = self.prog["drgn_test_va"].value_()
        phys_addr = self.prog["drgn_test_pa"].value_()

        cmd = self.check_crash_command(f"vtop {virt_addr:x}")
        self.assertTrue(
            f"{phys_addr:x}" in cmd.stdout or "ambiguous address" in cmd.stderr
        )

        self.assertIn("follow_phys(init_mm", cmd.drgn_option.stdout)
        self.assertIn("follow_phys(mm", cmd.drgn_option.stdout)
        self.assertEqual(cmd.drgn_option.globals["virtual_address"], virt_addr)


class TestSwap(CrashCommandTestCase):
    @skip_unless_have_test_disk
    def test_swap(self):
        with tmp_swaps() as swaps:
            cmd = self.check_crash_command("swap")
            for path, is_file in swaps:
                type = "FILE" if is_file else "PARTITION"
                self.assertRegex(cmd.stdout, f"{type} .* {re.escape(str(path))}")
        self.assertEqual(
            cmd.drgn_option.globals["si"].type_.type_name(), "struct swap_info_struct *"
        )
        self.assertIn("pages", cmd.drgn_option.globals)
        self.assertIn("used_pages", cmd.drgn_option.globals)
        self.assertIn("priority", cmd.drgn_option.globals)
        self.assertIsInstance(cmd.drgn_option.globals["path"], bytes)


class TestVm(CrashCommandTestCase):
    def _test_common(self, cmd):
        self.assertRegex(
            cmd.stdout,
            r"\bMM\s+PGD\s+RSS\s+TOTAL_VM(?:\s+[0-9a-f]+){2}(?:\s+[0-9]+k){2}\b",
        )
        self.assertRegex(
            cmd.stdout, r"\bVMA\s+START\s+END\s+FLAGS\s+FILE(?:\s+[0-9a-f]+){4}"
        )
        self.assertRegex(cmd.stdout, r"\bVMA(?s:.)*/")  # Check for any file path.

    def _test_drgn_option_common(self, cmd):
        for variable in (
            "pid",
            "task",
            "command",
            "mm",
            "pgd",
            "vma",
            "start",
            "end",
            "flags",
        ):
            with self.subTest(variable=variable):
                self.assertIsInstance(cmd.drgn_option.globals[variable], Object)
        self.assertIsInstance(cmd.drgn_option.globals["cpu"], int)
        self.assertIsInstance(cmd.drgn_option.globals["rss"], TaskRss)
        self.assertIsInstance(cmd.drgn_option.globals["file"], bytes)

    def test_no_args(self):
        self.run_crash_command("set -p")

        cmd = self.check_crash_command("vm")
        self.assertIn(f"PID: {os.getpid()}", cmd.stdout)
        self._test_common(cmd)

        self._test_drgn_option_common(cmd)

    def test_tasks(self):
        cmd = self.check_crash_command(f"vm 1 {os.getpid()}")
        foreach_cmd = self.check_crash_command(
            f"foreach 1 {os.getpid()} vm", mode="capture"
        )

        for c in (cmd, foreach_cmd):
            self.assertIn("PID: 1", c.stdout)
            self.assertIn(f"PID: {os.getpid()}", c.stdout)
            self._test_common(c)

        self._test_drgn_option_common(cmd)

        self.assertEqual(cmd.drgn_option.stdout, foreach_cmd.drgn_option.stdout)

    def test_kernel_thread(self):
        cmd = self.check_crash_command("vm 2")

        self.assertRegex(cmd.stdout, r"\bMM\s+PGD\s+RSS\s+TOTAL_VM\s+0\s+0\s+0k\s+0k\b")
        self.assertNotIn("VMA", cmd.stdout)

        self.assertFalse(cmd.drgn_option.globals["mm"])