File: px_process_test.py

package info (click to toggle)
px 3.6.9-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,172 kB
  • sloc: python: 6,445; sh: 205; makefile: 4
file content (390 lines) | stat: -rw-r--r-- 12,459 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
import getpass
import datetime

import os
import pytest

from px import px_process
from . import testutils

from typing import MutableSet


def test_create_process():
    process_builder = px_process.PxProcessBuilder()
    process_builder.pid = 7
    process_builder.ppid = 1
    process_builder.rss_kb = 123
    process_builder.start_time_string = testutils.TIMESTRING
    process_builder.username = "usernamex"
    process_builder.cpu_time = 1.3
    process_builder.memory_percent = 42.7
    process_builder.cmdline = "hej kontinent"
    test_me = process_builder.build(testutils.local_now())

    assert test_me.pid == 7
    assert test_me.ppid == 1
    assert test_me.rss_kb == 123
    assert test_me.username == "usernamex"
    assert test_me.cpu_time_s == "1.3s"
    assert test_me.memory_percent_s == "43%"
    assert test_me.cmdline == "hej kontinent"

    assert test_me.start_time == testutils.TIME
    assert test_me.age_seconds > 0


def test_create_future_process():
    """
    Handle the case where we first look at the clock, then list processes.

    Let's say that:
    1. We look at the clock, and the clock is 12:34:56
    2. We find a newly started process at     12:34:57

    This case used to lead to crashes when we asserted for this:
    https://github.com/walles/px/issues/84
    """
    process_builder = px_process.PxProcessBuilder()

    # This is what we want to test
    process_builder.start_time_string = testutils.TIMESTRING
    before_the_process_was_started = testutils.TIME - datetime.timedelta(seconds=1)

    # These values are required to not fail in other ways
    process_builder.cmdline = "hej kontinent"
    process_builder.pid = 1
    process_builder.rss_kb = 123
    process_builder.username = "johan"

    # Test it!
    test_me = process_builder.build(before_the_process_was_started)
    assert test_me.age_seconds == 0


def test_ps_line_to_process_unicode():
    process = testutils.create_process(cputime="2:14.15")

    assert process.username == "root"
    assert process.cmdline == "/usr/sbin/cupsd -l"


def test_ps_line_to_process_1():
    process = testutils.create_process()

    assert process.pid == 47536
    assert process.ppid == 1234
    assert process.username == "root"
    assert process.cpu_time_s == "0.03s"
    assert process.memory_percent_s == "0%"
    assert process.cmdline == "/usr/sbin/cupsd -l"

    assert process.start_time == testutils.TIME
    assert process.age_seconds > 0


def test_ps_line_to_process_2():
    process = testutils.create_process(cputime="2:14.15")

    assert process.pid == 47536
    assert process.ppid == 1234
    assert process.username == "root"
    assert process.cpu_time_s == "2m14s"
    assert process.memory_percent_s == "0%"
    assert process.cmdline == "/usr/sbin/cupsd -l"

    assert process.start_time == testutils.TIME
    assert process.age_seconds > 0


# From a real-world failure
def test_ps_line_to_process_3():
    process = px_process.ps_line_to_process(
        "  5328"
        "   4432"
        "   123"
        " Thu Feb 25 07:42:36 2016"
        " " + str(os.getuid()) + " 5.5"
        "    1-19:31:31"
        " 19.7"
        " /usr/sbin/mysqld"
        " --basedir=/usr"
        " --datadir=/data/user/mysql"
        " --plugin-dir=/usr/lib/mysql/plugin"
        " --user=mysql"
        " --log-error=/var/log/mysql/mysql.err"
        " --pid-file=/var/run/mysqld/mysqld.pid"
        " --socket=/var/run/mysqld/mysqld.sock"
        " --port=3306",
        testutils.local_now(),
    )
    assert process.username == getpass.getuser()
    assert process.cpu_percent_s == "6%"
    assert process.memory_percent_s == "20%"
    assert process.cpu_time_s == "1d19h"
    assert process.command == "mysqld"


def _validate_references(processes):
    """Fsck the parent / children relationships between all processes"""
    for process in processes:
        if process.pid == 0:
            assert process.parent is None
        else:
            assert process.parent is not None

        assert isinstance(process.children, list)
        if process.parent:
            assert process.parent in processes
            assert process.parent.pid == process.ppid
            assert process in process.parent.children

        for child in process.children:
            assert child in processes
            assert child.parent == process


def _test_get_all():
    all_processes = px_process.get_all()
    assert len(all_processes) >= 4  # Expect at least kernel, init, bash and python
    for process in all_processes:
        assert process is not None

    pids = list(map(lambda p: p.pid, all_processes))

    # Finding ourselves is just confusing...
    assert os.getpid() not in pids

    # ... but all other processes should be there
    assert os.getppid() in pids

    # PID 1 is launchd on OS X, init on Linux.
    #
    # If there's a system where PID 1 doesn't exist this test needs to be modded
    # and that system documented here.
    assert 1 in pids

    # Assert that all contains no duplicate PIDs
    seen_pids: MutableSet[int] = set()
    for process in all_processes:
        pid = process.pid
        assert pid not in seen_pids
        seen_pids.add(pid)

    # Assert that there are processes with the current user name
    current_users_processes = filter(
        lambda process: process.username == getpass.getuser(), all_processes
    )
    assert current_users_processes

    _validate_references(all_processes)

    for process in all_processes:
        assert isinstance(process.cmdline, str)
        assert isinstance(process.username, str)

    if os.environ.get("CI") == "true":
        # Checking for future processes sometimes fails in CI. Since I don't
        # understand it and can't fix it, let's just not look for that in CI. If
        # it happens locally, then that's a good start for troupleshooting.
        #
        # For reference, in
        # https://github.com/Homebrew/homebrew-core/pull/186101 four different
        # runs of "macOS 14-arm64" failed like this, all with processes created
        # around 100s-210s in the future.
        return

    # Ensure no processes are from the future
    now = testutils.local_now()
    for process in all_processes:
        # Processes created in the future = fishy
        assert process.age_seconds >= 0
        assert process.start_time < now


def test_get_all_swedish():
    """
    In Swedish, floating point numbers are indicated with comma, so 4.2 in
    English is 4,2 in Swedish. This test verifies that setting a Swedish locale
    won't mess up our parsing.
    """
    os.environ["LANG"] = "sv_SE.UTF-8"
    os.environ["LC_TIME"] = "sv_SE.UTF-8"
    os.environ["LC_NUMERIC"] = "sv_SE.UTF-8"

    _test_get_all()


def test_get_all_defaultlocale():
    del os.environ["LANG"]
    _test_get_all()


def test_process_eq():
    """Compare two mostly identical processes, where one has a parent and the other one not"""
    process_a = testutils.create_process()

    process_b = testutils.create_process()
    parent = px_process.create_kernel_process(testutils.local_now())
    process_b.parent = parent

    assert process_a != process_b


def test_parse_time():
    assert px_process.parse_time("0:00.03") == 0.03
    assert px_process.parse_time("1:02.03") == 62.03
    assert px_process.parse_time("03:35:32") == 3 * 60 * 60 + 35 * 60 + 32
    assert px_process.parse_time("9-03:35:32") == 9 * 86400 + 3 * 60 * 60 + 35 * 60 + 32

    with pytest.raises(ValueError) as e:
        px_process.parse_time("Constantinople")
    assert "Constantinople" in str(e.value)


def test_match():
    p = testutils.create_process(uid=0, commandline="/usr/libexec/AirPlayXPCHelper")

    assert p.match(None)

    assert p.match("root")
    assert not p.match("roo")

    assert p.match("Air")
    assert p.match("Play")

    assert p.match("air")
    assert p.match("play")

    # Match PID by prefix but not substring. Exact matches are used for
    # searching in ptop. Prefix matching is used to not throw the right answer
    # away while the user is typing their search in ptop. Substring matching has
    # no value.
    assert p.match("47536")
    assert p.match("4753")
    assert not p.match("7536")


def test_seconds_to_str():
    assert px_process.seconds_to_str(0.54321) == "0.54s"
    assert px_process.seconds_to_str(1.54321) == "1.54s"
    assert px_process.seconds_to_str(0.5) == "0.5s"
    assert px_process.seconds_to_str(1.0) == "1.0s"
    assert px_process.seconds_to_str(1) == "1s"

    assert px_process.seconds_to_str(60.54321) == "1m00s"

    assert px_process.seconds_to_str(3598.54321) == "59m58s"
    assert px_process.seconds_to_str(3659.54321) == "1h00m"
    assert px_process.seconds_to_str(3660.54321) == "1h01m"
    assert px_process.seconds_to_str(4260.54321) == "1h11m"

    t1h = 3600
    t1d = t1h * 24
    assert px_process.seconds_to_str(t1d + 3598) == "1d00h"
    assert px_process.seconds_to_str(t1d + 3659) == "1d01h"
    assert px_process.seconds_to_str(t1d + t1h * 11) == "1d11h"


def test_get_command_line_array():
    p = testutils.create_process(commandline="/usr/libexec/AirPlayXPCHelper")
    assert p.get_command_line_array() == ["/usr/libexec/AirPlayXPCHelper"]

    p = testutils.create_process(commandline="/usr/sbin/universalaccessd launchd -s")
    assert p.get_command_line_array() == ["/usr/sbin/universalaccessd", "launchd", "-s"]


def test_get_command_line_array_space_in_binary(tmpdir):
    # Create a file name with a space in it
    spaced_path = tmpdir.join("i contain spaces")
    spaced_path.write_binary(b"")
    spaced_name = str(spaced_path)

    # Verify splitting of the spaced file name
    p = testutils.create_process(commandline=spaced_name)
    assert p.get_command_line_array() == [spaced_name]

    # Verify splitting with more parameters on the line
    p = testutils.create_process(commandline=spaced_name + " parameter")
    assert p.get_command_line_array() == [spaced_name, "parameter"]


def test_command_dotted_prefix():
    # If there's a dot with a lot of text after it we should drop everything
    # before the dot.
    p = testutils.create_process(
        commandline="/.../com.apple.InputMethodKit.TextReplacementService"
    )
    assert p.command == "TextReplacementService"

    # If there's a dot with four characters or less after it, assume it's a file
    # suffix and take the next to last section
    p = testutils.create_process(
        commandline="/.../com.apple.InputMethodKit.TextReplacementService.1234"
    )
    assert p.command == "TextReplacementService"
    p = testutils.create_process(
        commandline="/.../com.apple.InputMethodKit.TextReplacementService.12345"
    )
    assert p.command == "12345"


def test_command_linux_kernelproc():
    p = testutils.create_process(commandline="[ksoftirqd/0]")
    assert p.command == "[ksoftirqd/0]"

    p = testutils.create_process(commandline="[kworker/0:0H]")
    assert p.command == "[kworker/0:0H]"

    p = testutils.create_process(commandline="[rcuob/3]")
    assert p.command == "[rcuob/3]"


def test_command_in_parentheses():
    # Observed on OS X
    p = testutils.create_process(commandline="(python2.7)")
    assert p.command == "(python2.7)"


def test_uid_to_username():
    username = px_process.uid_to_username(os.getuid())
    assert username == getpass.getuser()
    assert isinstance(username, str)

    username = px_process.uid_to_username(456789)
    assert username == "456789"
    assert isinstance(username, str)


def test_resolve_links():
    UNKNOWN_PID = 1323532
    p1 = testutils.create_process(pid=1, ppid=UNKNOWN_PID)
    p2 = testutils.create_process(pid=2, ppid=1)
    processes = {p1.pid: p1, p2.pid: p2}
    px_process.resolve_links(processes, testutils.local_now())

    assert p1.parent is None
    assert p2.parent is p1

    # Verify both equality...
    assert p1.children == [p2]
    # ... and identity of the child.
    assert list(p1.children)[0] is p2


def test_resolve_links_multiple_roots():
    """There can be only one. Root. Of the process tree."""
    processes = px_process.get_all()
    process_map = {p.pid: p for p in processes}
    px_process.resolve_links(process_map, testutils.local_now())

    root0 = processes[0]
    while root0.parent:
        root0 = root0.parent

    for process in processes:
        root = process
        while root.parent:
            root = root.parent

        assert root is root0