1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
"""Test the pre-kill hook on Darwin."""
from __future__ import print_function
# system imports
from multiprocessing import Process, Queue
import platform
import re
from unittest import main, TestCase
# third party
from six import StringIO
def do_child_process(child_work_queue, parent_work_queue, verbose):
import os
pid = os.getpid()
if verbose:
print("child: pid {} started, sending to parent".format(pid))
parent_work_queue.put(pid)
if verbose:
print("child: waiting for shut-down request from parent")
child_work_queue.get()
if verbose:
print("child: received shut-down request. Child exiting.")
class DarwinPreKillTestCase(TestCase):
def __init__(self, methodName):
super(DarwinPreKillTestCase, self).__init__(methodName)
self.process = None
self.child_work_queue = None
self.verbose = False
def tearDown(self):
if self.verbose:
print("parent: sending shut-down request to child")
if self.process:
self.child_work_queue.put("hello, child")
self.process.join()
if self.verbose:
print("parent: child is fully shut down")
def test_sample(self):
# Ensure we're Darwin.
if platform.system() != 'Darwin':
self.skipTest("requires a Darwin-based OS")
# Start the child process.
self.child_work_queue = Queue()
parent_work_queue = Queue()
self.process = Process(target=do_child_process,
args=(self.child_work_queue, parent_work_queue,
self.verbose))
if self.verbose:
print("parent: starting child")
self.process.start()
# Wait for the child to report its pid. Then we know we're running.
if self.verbose:
print("parent: waiting for child to start")
child_pid = parent_work_queue.get()
# Sample the child process.
from darwin import do_pre_kill
context_dict = {
"archs": [platform.machine()],
"platform_name": None,
"platform_url": None,
"platform_working_dir": None
}
if self.verbose:
print("parent: running pre-kill action on child")
output_io = StringIO()
do_pre_kill(child_pid, context_dict, output_io)
output = output_io.getvalue()
if self.verbose:
print("parent: do_pre_kill() wrote the following output:", output)
self.assertIsNotNone(output)
# We should have a line with:
# Process: .* [{pid}]
process_re = re.compile(r"Process:[^[]+\[([^]]+)\]")
match = process_re.search(output)
self.assertIsNotNone(match, "should have found process id for "
"sampled process")
self.assertEqual(1, len(match.groups()))
self.assertEqual(child_pid, int(match.group(1)))
# We should see a Call graph: section.
callgraph_re = re.compile(r"Call graph:")
match = callgraph_re.search(output)
self.assertIsNotNone(match, "should have found the Call graph section"
"in sample output")
# We should see a Binary Images: section.
binary_images_re = re.compile(r"Binary Images:")
match = binary_images_re.search(output)
self.assertIsNotNone(match, "should have found the Binary Images "
"section in sample output")
if __name__ == "__main__":
main()
|