File: search_kernel_memory.py

package info (click to toggle)
drgn 0.0.33-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,892 kB
  • sloc: python: 59,081; ansic: 51,400; awk: 423; makefile: 339; sh: 113
file content (113 lines) | stat: -rwxr-xr-x 3,908 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env drgn
# Copyright (c) Meta Platforms, Inc. and affiliates.
# SPDX-License-Identifier: LGPL-2.1-or-later

"""
Search kernel memory for a byte string. This only works if the kernel was built
with CONFIG_PROC_KCORE=y.
"""

import argparse
import sys

from drgn import FaultError, Object
from drgn.helpers.common.memory import identify_address
from drgn.helpers.linux.list import list_for_each_entry
from drgn.helpers.linux.mm import for_each_vmap_area, virt_to_page


def virt_to_vmap_address(prog, addr):
    page = virt_to_page(addr)
    for va in for_each_vmap_area(prog):
        vm = va.vm.read_()
        if vm:
            for i, va_page in enumerate(
                Object(
                    prog, prog.array_type(page.type_, vm.nr_pages), address=vm.pages
                ).read_()
            ):
                if va_page == page:
                    return (
                        va.va_start.value_()
                        + (i << prog["PAGE_SHIFT"])
                        + (addr & (prog["PAGE_SIZE"].value_() - 1))
                    )
    return None


def search_memory(prog, needle):
    if isinstance(needle, Object):
        needle = needle.to_bytes_()

    KCORE_RAM = prog["KCORE_RAM"]
    CHUNK_SIZE = 1024 * 1024
    PAGE_SIZE = prog["PAGE_SIZE"].value_()
    for kc in list_for_each_entry(
        "struct kcore_list", prog["kclist_head"].address_of_(), "list"
    ):
        if kc.type != KCORE_RAM:
            continue
        addr = kc.addr.value_()
        end = addr + kc.size.value_()
        while addr < end:
            try:
                buf = prog.read(addr, min(CHUNK_SIZE, end - addr))
            except FaultError:
                # We start with a large chunk size to reduce the overhead of
                # reading memory. However, if we're reading from a core dump,
                # reading with a large chunk size may fault on excluded pages.
                if CHUNK_SIZE > PAGE_SIZE:
                    # We faulted with a large chunk size. Fall back to
                    # page-by-page and retry.
                    CHUNK_SIZE = PAGE_SIZE
                else:
                    # We're already reading page-by-page. Skip this page.
                    addr += CHUNK_SIZE
                continue

            i = 0
            while i < len(buf):
                i = buf.find(needle, i)
                if i < 0:
                    break

                vmap_address = virt_to_vmap_address(prog, addr + i)
                if vmap_address is not None:
                    identity = identify_address(prog, vmap_address)
                else:
                    identity = identify_address(prog, addr + i)

                if identity is None:
                    print(hex(addr + i))
                else:
                    print(hex(addr + i), identity)
                i += 1

            addr += CHUNK_SIZE


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Search kernel memory for a value and print the addresses where it is found"
    )
    group = parser.add_argument_group(
        title="search target",
        description="If none of these are given, search for a byte string read from standard input.",
    ).add_mutually_exclusive_group()
    group.add_argument("--string", help="search for an ASCII/UTF-8 string")
    group.add_argument("--hex", help="search for a byte string, given in hexadecimal")
    group.add_argument(
        "--address", help="search for an address-sized integer, given in hexadecimal"
    )
    args = parser.parse_args()

    if args.string is not None:
        needle = args.string.encode()
    elif args.hex is not None:
        needle = bytes.fromhex(args.hex)
    elif args.address is not None:
        needle = Object(prog, "void *", int(args.address, 16))
    else:
        needle = sys.stdin.buffer.read()

    search_memory(prog, needle)