File: test_common.py

package info (click to toggle)
drgn 0.0.33-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,892 kB
  • sloc: python: 59,081; ansic: 51,400; awk: 423; makefile: 339; sh: 113
file content (271 lines) | stat: -rw-r--r-- 11,310 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# Copyright (c) Meta Platforms, Inc. and affiliates.
# SPDX-License-Identifier: LGPL-2.1-or-later

from contextlib import redirect_stdout
import io

from _drgn_util.platform import NORMALIZED_MACHINE_NAME
from drgn import offsetof, sizeof
from drgn.helpers.common.memory import (
    IdentifiedSymbol,
    identify_address,
    identify_address_all,
    print_annotated_memory,
)
from drgn.helpers.common.stack import print_annotated_stack
from drgn.helpers.linux.common import (
    IdentifiedSlabObject,
    IdentifiedTaskStack,
    IdentifiedTaskStruct,
    IdentifiedVmap,
)
from drgn.helpers.linux.mm import pfn_to_virt
from drgn.helpers.linux.sched import idle_task
from tests.linux_kernel import (
    HAVE_FULL_MM_SUPPORT,
    LinuxKernelTestCase,
    online_cpus,
    skip_if_slob,
    skip_unless_have_full_mm_support,
    skip_unless_have_stack_tracing,
    skip_unless_have_test_kmod,
)


class TestIdentifyAddress(LinuxKernelTestCase):
    @skip_unless_have_test_kmod
    def test_identify_symbol(self):
        symbol = self.prog.symbol("drgn_test_function")

        identified = list(identify_address_all(self.prog, symbol.address))
        self.assertIsInstance(identified[0], IdentifiedSymbol)
        self.assertEqual(str(identified[0]), "function symbol: drgn_test_function+0x0")
        # Module symbols are also vmapped.
        self.assertIsInstance(identified[1], IdentifiedVmap)
        self.assertEqual(len(identified), 2)

        self.assertEqual(
            identify_address(self.prog, symbol.address + 1),
            "function symbol: drgn_test_function+0x1",
        )

    @skip_unless_have_full_mm_support
    @skip_unless_have_test_kmod
    def test_identify_slab_cache(self):
        for size in ("small", "big"):
            with self.subTest(size=size):
                objects = self.prog[f"drgn_test_{size}_slab_objects"]

                if self.prog["drgn_test_slob"]:
                    for obj in objects:
                        if size == "small":
                            self.assertEqual(
                                identify_address(obj), "unknown slab object"
                            )
                        else:
                            self.assertIsNone(identify_address(obj))
                else:
                    for obj in objects:
                        self.assertEqual(
                            identify_address(obj),
                            f"slab object: drgn_test_{size}+0x0",
                        )

    @skip_unless_have_full_mm_support
    @skip_unless_have_test_kmod
    @skip_if_slob
    def test_identify_task(self):
        identified = list(identify_address_all(self.prog["drgn_test_kthread"]))
        self.assertIsInstance(identified[0], IdentifiedTaskStruct)
        self.assertEqual(identified[0].task, self.prog["drgn_test_kthread"])
        self.assertEqual(
            str(identified[0]),
            f"task: {self.prog['drgn_test_kthread'].pid.value_()} (drgn_test_kthre)",
        )
        self.assertIsInstance(identified[1], IdentifiedSlabObject)
        self.assertEqual(len(identified), 2)

    @skip_unless_have_full_mm_support
    @skip_unless_have_test_kmod
    @skip_if_slob
    def test_identify_task_member(self):
        pid_offset = offsetof(self.prog.type("struct task_struct"), "pid")
        self.assertEqual(
            identify_address(self.prog, self.prog["drgn_test_kthread"].pid.address_),
            f"task: {self.prog['drgn_test_kthread'].pid.value_()} (drgn_test_kthre) +{hex(pid_offset)}",
        )

    def test_identify_idle_task_0(self):
        identified = list(identify_address_all(self.prog["init_task"].address_of_()))
        self.assertIsInstance(identified[0], IdentifiedTaskStruct)
        self.assertEqual(identified[0].task, idle_task(self.prog, 0))
        self.assertIsInstance(identified[1], IdentifiedSymbol)
        self.assertGreaterEqual(len(identified), 2)

    @skip_unless_have_full_mm_support
    @skip_if_slob
    def test_identify_idle_task_1(self):
        for cpu in online_cpus():
            if cpu > 0:
                break
        else:
            self.skipTest("online CPU > 0 not found")

        task = idle_task(self.prog, cpu)
        identified = list(identify_address_all(task))
        self.assertIsInstance(identified[0], IdentifiedTaskStruct)
        self.assertEqual(identified[0].task, task)
        self.assertIsInstance(identified[1], IdentifiedSlabObject)
        self.assertEqual(len(identified), 2)

    @skip_unless_have_test_kmod
    def test_identify_vmap(self):
        for cache in (None, {}):
            with self.subTest("uncached" if cache is None else "cached"):
                self.assertTrue(
                    identify_address(
                        self.prog["drgn_test_vmalloc_va"], cache=cache
                    ).startswith("vmap: 0x")
                )

    @skip_unless_have_full_mm_support
    @skip_unless_have_test_kmod
    def test_identify_task_stack(self):
        if self.prog["drgn_test_slob"] and self.prog["drgn_test_slab_stack_enabled"]:
            self.skipTest("test does not support SLOB")

        for cached in (False, True):
            with self.subTest("cached" if cached else "uncached"):
                identified = list(
                    identify_address_all(
                        self.prog,
                        self.prog["drgn_test_kthread"].stack.value_() + 1234,
                        cache={} if cached else None,
                    )
                )
                self.assertIsInstance(identified[0], IdentifiedTaskStack)
                self.assertEqual(
                    str(identified[0]),
                    f"task stack: {self.prog['drgn_test_kthread'].pid.value_()} (drgn_test_kthre) +0x4d2",
                )
                if self.prog["drgn_test_vmap_stack_enabled"]:
                    self.assertIsInstance(identified[1], IdentifiedVmap)
                    self.assertEqual(len(identified), 2)
                elif self.prog["drgn_test_slab_stack_enabled"]:
                    self.assertIsInstance(identified[1], IdentifiedSlabObject)
                    self.assertEqual(len(identified), 2)
                else:
                    self.assertEqual(len(identified), 1)

    @skip_unless_have_full_mm_support
    def test_identify_idle_task_0_stack(self):
        identified = list(identify_address_all(self.prog["init_task"].stack))
        self.assertIsInstance(identified[0], IdentifiedTaskStack)
        self.assertEqual(identified[0].task, idle_task(self.prog, 0))
        # s390x between Linux kernel commits ce3dc447493f ("s390: add support
        # for virtually mapped kernel stacks") (in v4.20) and 944c78376a39
        # ("s390: use init_thread_union aka initial stack for the first
        # process") (in v6.4) allocates init_task.stack.
        if NORMALIZED_MACHINE_NAME == "s390x":
            if self.prog["drgn_test_vmap_stack_enabled"]:
                self.assertIsInstance(identified[1], (IdentifiedSymbol, IdentifiedVmap))
                self.assertEqual(len(identified), 2)
            elif self.prog["drgn_test_slab_stack_enabled"]:
                self.assertIsInstance(
                    identified[1], (IdentifiedSymbol, IdentifiedSlabObject)
                )
                self.assertEqual(len(identified), 2)
            elif len(identified) > 1:
                self.assertIsInstance(identified[1], IdentifiedSymbol)
                self.assertEqual(len(identified), 2)
        else:
            self.assertIsInstance(identified[1], IdentifiedSymbol)
            self.assertGreaterEqual(len(identified), 2)

    @skip_unless_have_full_mm_support
    @skip_unless_have_test_kmod
    def test_identify_idle_task_1_stack(self):
        if self.prog["drgn_test_slob"] and self.prog["drgn_test_slab_stack_enabled"]:
            self.skipTest("test does not support SLOB")

        for cpu in online_cpus():
            if cpu > 0:
                break
        else:
            self.skipTest("online CPU > 0 not found")

        task = idle_task(self.prog, cpu)

        for cached in (False, True):
            with self.subTest("cached" if cached else "uncached"):
                identified = list(
                    identify_address_all(task.stack, cache={} if cached else None)
                )
                self.assertIsInstance(identified[0], IdentifiedTaskStack)
                self.assertEqual(identified[0].task, task)
                if self.prog["drgn_test_vmap_stack_enabled"]:
                    self.assertIsInstance(identified[1], IdentifiedVmap)
                    self.assertEqual(len(identified), 2)
                elif self.prog["drgn_test_slab_stack_enabled"]:
                    self.assertIsInstance(identified[1], IdentifiedSlabObject)
                    self.assertEqual(len(identified), 2)
                else:
                    self.assertEqual(len(identified), 1)

    @skip_unless_have_test_kmod
    def test_identify_page(self):
        self.assertEqual(
            identify_address(self.prog["drgn_test_page"]),
            f"page: pfn {self.prog['drgn_test_pfn'].value_()}",
        )
        mapping_offset = offsetof(self.prog.type("struct page"), "mapping")
        self.assertEqual(
            identify_address(self.prog["drgn_test_page"].mapping.address_of_()),
            f"page: pfn {self.prog['drgn_test_pfn'].value_()} +{hex(mapping_offset)}",
        )

    @skip_unless_have_full_mm_support
    @skip_unless_have_test_kmod
    def test_identify_unrecognized(self):
        start_addr = (pfn_to_virt(self.prog["min_low_pfn"])).value_()
        # On s390x, the start address is 0, and identify_address() doesn't
        # allow a negative address.
        if start_addr > 0:
            self.assertIsNone(identify_address(self.prog, start_addr - 1))
        self.assertIsNone(identify_address(self.prog, self.prog["drgn_test_va"]))


class TestPrintAnnotatedMemory(LinuxKernelTestCase):
    @skip_unless_have_full_mm_support
    @skip_unless_have_test_kmod
    def test_print_annotated_memory(self):
        f = io.StringIO()
        with redirect_stdout(f):
            print_annotated_memory(
                self.prog,
                self.prog["drgn_test_small_slab_objects"].address_,
                sizeof(self.prog["drgn_test_small_slab_objects"]),
            )
        # For CONFIG_SLOB, we cannot find slab objects. However,
        # print_annotated_memory() should still function with no error. So we
        # don't skip the test here: just skip the assertion.
        if not self.prog["drgn_test_slob"]:
            self.assertIn("slab object: drgn_test_small+0x0", f.getvalue())


class TestPrintAnnotatedStack(LinuxKernelTestCase):
    @skip_unless_have_stack_tracing
    @skip_unless_have_test_kmod
    def test_print_annotated_stack(self):
        trace = self.prog.stack_trace(self.prog["drgn_test_kthread"])

        f = io.StringIO()
        with redirect_stdout(f):
            print_annotated_stack(trace)

        printed_trace = f.getvalue()

        if HAVE_FULL_MM_SUPPORT and not self.prog["drgn_test_slob"]:
            self.assertIn("slab object: drgn_test_small", printed_trace)
        self.assertIn("[function symbol: schedule", printed_trace)
        self.assertIn("schedule at ", printed_trace)