File: proc.py

package info (click to toggle)
pwntools 4.14.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 18,436 kB
  • sloc: python: 59,156; ansic: 48,063; asm: 45,030; sh: 396; makefile: 256
file content (424 lines) | stat: -rw-r--r-- 10,573 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
from __future__ import absolute_import
from __future__ import division

import errno
import socket
import sys
import time

import psutil

from pwnlib import tubes
from pwnlib.log import getLogger
from .net import sock_match
from pwnlib.timeout import Timeout

log = getLogger(__name__)

all_pids = psutil.pids

def pidof(target):
    """pidof(target) -> int list

    Get PID(s) of `target`.  The returned PID(s) depends on the type of `target`:

    - :class:`str`: PIDs of all processes with a name matching `target`.
    - :class:`pwnlib.tubes.process.process`: singleton list of the PID of `target`.
    - :class:`pwnlib.tubes.sock.sock`: singleton list of the PID at the
      remote end of `target` if it is running on the host.  Otherwise an
      empty list.

    Arguments:
        target(object):  The target whose PID(s) to find.

    Returns:
        A list of found PIDs.

    Example:

        >>> l = tubes.listen.listen()
        >>> p = process(['curl', '-s', 'http://127.0.0.1:%d'%l.lport])
        >>> pidof(p) == pidof(l) == pidof(('127.0.0.1', l.lport))
        True
    """
    if isinstance(target, tubes.ssh.ssh_channel):
        return [target.pid]

    elif isinstance(target, tubes.sock.sock):
        local  = target.sock.getsockname()
        remote = target.sock.getpeername()
        match = sock_match(remote, local, target.family, target.type)
        return [c.pid for c in psutil.net_connections() if match(c)]

    elif isinstance(target, tuple):
        match = sock_match(None, target)
        return [c.pid for c in psutil.net_connections() if match(c)]

    elif isinstance(target, tubes.process.process):
        return [target.proc.pid]

    else:
        return pid_by_name(target)

def pid_by_name(name):
    """pid_by_name(name) -> int list

    Arguments:
        name (str): Name of program.

    Returns:
        List of PIDs matching `name` sorted by lifetime, youngest to oldest.

    Example:

        >>> os.getpid() in pid_by_name(name(os.getpid()))
        True
    """
    def match(p):
        if p.status() == 'zombie':
            return False
        if p.name() == name:
            return True
        try:
            if p.exe() == name:
                return True
        except Exception:
            pass
        return False

    processes = (p for p in psutil.process_iter() if match(p))

    processes = sorted(processes, key=lambda p: p.create_time(), reverse=True)

    return [p.pid for p in processes]

def name(pid):
    """name(pid) -> str

    Arguments:
        pid (int): PID of the process.

    Returns:
        Name of process as listed in ``/proc/<pid>/status``.

    Example:

        >>> p = process('cat')
        >>> name(p.pid)
        'cat'
    """
    return psutil.Process(pid).name()

def parent(pid):
    """parent(pid) -> int

    Arguments:
        pid (int): PID of the process.

    Returns:
        Parent PID as listed in ``/proc/<pid>/status`` under ``PPid``,
        or 0 if there is not parent.
    """
    try:
         return psutil.Process(pid).parent().pid
    except Exception:
         return 0

def children(ppid):
    """children(ppid) -> int list

    Arguments:
        pid (int): PID of the process.

    Returns:
        List of PIDs of whose parent process is `pid`.
    """
    return [p.pid for p in psutil.Process(ppid).children()]

def ancestors(pid):
    """ancestors(pid) -> int list

    Arguments:
        pid (int): PID of the process.

    Returns:
        List of PIDs of whose parent process is `pid` or an ancestor of `pid`.

    Example:

        >>> ancestors(os.getpid()) # doctest: +ELLIPSIS
        [..., 1]
    """
    pids = []
    while pid != 0:
         pids.append(pid)
         pid = parent(pid)
    return pids

def descendants(pid):
    """descendants(pid) -> dict

    Arguments:
        pid (int): PID of the process.

    Returns:
        Dictionary mapping the PID of each child of `pid` to it's descendants.

    Example:

        >>> d = descendants(os.getppid())
        >>> os.getpid() in d.keys()
        True
    """
    this_pid = pid
    allpids = all_pids()
    ppids = {}
    def _parent(pid):
         if pid not in ppids:
             ppids[pid] = parent(pid)
         return ppids[pid]
    def _children(ppid):
         return [pid for pid in allpids if _parent(pid) == ppid]
    def _loop(ppid):
         return {pid: _loop(pid) for pid in _children(ppid)}
    return _loop(pid)

def exe(pid):
    """exe(pid) -> str

    Arguments:
        pid (int): PID of the process.

    Returns:
        The path of the binary of the process. I.e. what ``/proc/<pid>/exe`` points to.

    Example:

        >>> exe(os.getpid()) == os.path.realpath(sys.executable)
        True
    """
    return psutil.Process(pid).exe()

def cwd(pid):
    """cwd(pid) -> str

    Arguments:
        pid (int): PID of the process.

    Returns:
        The path of the process's current working directory. I.e. what
        ``/proc/<pid>/cwd`` points to.

    Example:

        >>> cwd(os.getpid()) == os.getcwd()
        True
    """
    return psutil.Process(pid).cwd()

def cmdline(pid):
    """cmdline(pid) -> str list

    Arguments:
        pid (int): PID of the process.

    Returns:
        A list of the fields in ``/proc/<pid>/cmdline``.

    Example:

        >>> 'py' in ''.join(cmdline(os.getpid()))
        True
    """
    return psutil.Process(pid).cmdline()

def memory_maps(pid):
    """memory_maps(pid) -> list
    
    Arguments:
        pid (int): PID of the process.

    Returns:
        A list of the memory mappings in the process.

    Example:

        >>> maps = memory_maps(os.getpid())
        >>> [(m.path, m.perms) for m in maps if '[stack]' in m.path]
        [('[stack]', 'rw-p')]

    """
    return psutil.Process(pid).memory_maps(grouped=False)

def stat(pid):
    """stat(pid) -> str list

    Arguments:
        pid (int): PID of the process.

    Returns:
        A list of the values in ``/proc/<pid>/stat``, with the exception that ``(`` and ``)`` has been removed from around the process name.

    Example:

        >>> stat(os.getpid())[2]
        'R'
    """
    with open('/proc/%d/stat' % pid) as fd:
         s = fd.read()
    # filenames can have ( and ) in them, dammit
    i = s.find('(')
    j = s.rfind(')')
    name = s[i+1:j]
    return s[:i].split() + [name] + s[j+1:].split()

def starttime(pid):
    """starttime(pid) -> float

    Arguments:
        pid (int): PID of the process.

    Returns:
        The time (in seconds) the process started after system boot

    Example:

        >>> starttime(os.getppid()) <= starttime(os.getpid())
        True
    """
    return psutil.Process(pid).create_time() - psutil.boot_time()

def status(pid):
    """status(pid) -> dict

    Get the status of a process.

    Arguments:
        pid (int): PID of the process.

    Returns:
        The contents of ``/proc/<pid>/status`` as a dictionary.
    """
    out = {}
    try:
        with open('/proc/%d/status' % pid) as fd:
            for line in fd:
                if ':' not in line:
                    continue
                i = line.index(':')
                key = line[:i]
                val = line[i + 2:-1] # initial :\t and trailing \n
                out[key] = val
    except OSError as e:
        if e.errno == errno.ENOENT:
            raise ValueError('No process with PID %d' % pid)
        else:
            raise
    return out

def _tracer_windows(pid):
    import ctypes
    from ctypes import wintypes

    def _check_bool(result, func, args):
        if not result:
            raise ctypes.WinError(ctypes.get_last_error())
        return args

    kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
    OpenProcess = kernel32.OpenProcess 
    OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD]
    OpenProcess.restype = wintypes.HANDLE
    OpenProcess.errcheck = _check_bool

    CheckRemoteDebuggerPresent = kernel32.CheckRemoteDebuggerPresent
    CheckRemoteDebuggerPresent.argtypes = [wintypes.HANDLE, ctypes.POINTER(wintypes.BOOL)]
    CheckRemoteDebuggerPresent.restype = wintypes.BOOL
    CheckRemoteDebuggerPresent.errcheck = _check_bool

    CloseHandle = kernel32.CloseHandle
    CloseHandle.argtypes = [wintypes.HANDLE]
    CloseHandle.restype = wintypes.BOOL
    CloseHandle.errcheck = _check_bool

    PROCESS_QUERY_INFORMATION = 0x0400
    proc_handle = OpenProcess(PROCESS_QUERY_INFORMATION, False, pid)
    present = wintypes.BOOL()
    CheckRemoteDebuggerPresent(proc_handle, ctypes.byref(present))
    ret = 0
    if present.value:
        ret = pid
    CloseHandle(proc_handle)

    return ret

def tracer(pid):
    """tracer(pid) -> int

    Arguments:
        pid (int): PID of the process.

    Returns:
        PID of the process tracing `pid`, or None if no `pid` is not being traced.

    Example:

        >>> tracer(os.getpid()) is None
        True
    """
    if sys.platform == 'win32':
        tpid = _tracer_windows(pid)
    else:
        tpid = int(status(pid)['TracerPid'])
    return tpid if tpid > 0 else None

def state(pid):
    """state(pid) -> str

    Arguments:
        pid (int): PID of the process.

    Returns:
        State of the process as listed in ``/proc/<pid>/status``.  See `proc(5)` for details.

    Example:

        >>> state(os.getpid())
        'R (running)'
    """
    return status(pid)['State']

def wait_for_debugger(pid, debugger_pid=None):
    """wait_for_debugger(pid, debugger_pid=None) -> None

    Sleeps until the process with PID `pid` is being traced.
    If debugger_pid is set and debugger exits, raises an error.

    Arguments:
        pid (int): PID of the process.

    Returns:
        None
    """
    t = Timeout()
    with t.countdown(timeout=15):
        with log.waitfor('Waiting for debugger') as l:
            while t.timeout and tracer(pid) is None:
                if debugger_pid:
                    debugger = psutil.Process(debugger_pid)
                    try:
                        debugger.wait(0.01)
                    except psutil.TimeoutExpired:
                        pass
                    else:
                        debugger_pid = 0
                        break
                else:
                    time.sleep(0.01)

            if tracer(pid):
                l.success()
            elif debugger_pid == 0:
                l.failure("debugger exited! (maybe check /proc/sys/kernel/yama/ptrace_scope)")
            else:
                l.failure('Debugger did not attach to pid %d within 15 seconds', pid)