1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
import os
from time import sleep
import gdbremote_testcase
import lldbgdbserverutils
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil
class TestGdbRemoteAttachWait(gdbremote_testcase.GdbRemoteTestCaseBase):
def _set_up_inferior(self):
self._exe_to_attach = '%s_%d' % (self.testMethodName, os.getpid())
self.build(dictionary={'EXE': self._exe_to_attach, 'CXX_SOURCES': 'main.cpp'})
if self.getPlatform() != "windows":
# Use a shim to ensure that the process is ready to be attached from
# the get-go.
self._exe_to_run = "shim"
self._run_args = [self.getBuildArtifact(self._exe_to_attach)]
self.build(dictionary={'EXE': self._exe_to_run, 'CXX_SOURCES':
'shim.cpp'})
else:
self._exe_to_run = self._exe_to_attach
self._run_args = []
def _launch_inferior(self, args):
inferior = self.spawnSubprocess(self.getBuildArtifact(self._exe_to_run),
args)
self.assertIsNotNone(inferior)
self.assertTrue(inferior.pid > 0)
self.assertTrue(
lldbgdbserverutils.process_is_running(
inferior.pid, True))
return inferior
def _launch_and_wait_for_init(self):
sync_file_path = lldbutil.append_to_process_working_directory(self,
"process_ready")
inferior = self._launch_inferior(self._run_args + [sync_file_path])
lldbutil.wait_for_file_on_target(self, sync_file_path)
return inferior
def _attach_packet(self, packet_type):
return "read packet: ${};{}#00".format(packet_type,
lldbgdbserverutils.gdbremote_hex_encode_string(self._exe_to_attach))
@skipIfWindows # This test is flaky on Windows
def test_attach_with_vAttachWait(self):
self._set_up_inferior()
self.set_inferior_startup_attach_manually()
server = self.connect_to_debug_monitor()
self.do_handshake()
# Launch the first inferior (we shouldn't attach to this one).
self._launch_and_wait_for_init()
self.test_sequence.add_log_lines([self._attach_packet("vAttachWait")],
True)
# Run the stream until attachWait.
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Sleep so we're sure that the inferior is launched after we ask for the attach.
sleep(1)
# Launch the second inferior (we SHOULD attach to this one).
inferior_to_attach = self._launch_inferior(self._run_args)
# Make sure the attach succeeded.
self.test_sequence.add_log_lines([
{"direction": "send",
"regex": r"^\$T([0-9a-fA-F]{2})[^#]*#[0-9a-fA-F]{2}$",
"capture": {1: "stop_signal_hex"}},
], True)
self.add_process_info_collection_packets()
# Run the stream sending the response..
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Gather process info response.
process_info = self.parse_process_info_response(context)
self.assertIsNotNone(process_info)
# Ensure the process id matches what we expected.
pid_text = process_info.get('pid', None)
self.assertIsNotNone(pid_text)
reported_pid = int(pid_text, base=16)
self.assertEqual(reported_pid, inferior_to_attach.pid)
@skipIfWindows # This test is flaky on Windows
def test_launch_before_attach_with_vAttachOrWait(self):
self._set_up_inferior()
self.set_inferior_startup_attach_manually()
server = self.connect_to_debug_monitor()
self.do_handshake()
inferior = self._launch_and_wait_for_init()
# Add attach packets.
self.test_sequence.add_log_lines([
# Do the attach.
self._attach_packet("vAttachOrWait"),
# Expect a stop notification from the attach.
{"direction": "send",
"regex": r"^\$T([0-9a-fA-F]{2})[^#]*#[0-9a-fA-F]{2}$",
"capture": {1: "stop_signal_hex"}},
], True)
self.add_process_info_collection_packets()
# Run the stream
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Gather process info response
process_info = self.parse_process_info_response(context)
self.assertIsNotNone(process_info)
# Ensure the process id matches what we expected.
pid_text = process_info.get('pid', None)
self.assertIsNotNone(pid_text)
reported_pid = int(pid_text, base=16)
self.assertEqual(reported_pid, inferior.pid)
@skipIfWindows # This test is flaky on Windows
def test_launch_after_attach_with_vAttachOrWait(self):
self._set_up_inferior()
self.set_inferior_startup_attach_manually()
server = self.connect_to_debug_monitor()
self.do_handshake()
self.test_sequence.add_log_lines([self._attach_packet("vAttachOrWait")],
True)
# Run the stream until attachWait.
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Sleep so we're sure that the inferior is launched after we ask for the attach.
sleep(1)
# Launch the inferior.
inferior = self._launch_inferior(self._run_args)
# Make sure the attach succeeded.
self.test_sequence.add_log_lines([
{"direction": "send",
"regex": r"^\$T([0-9a-fA-F]{2})[^#]*#[0-9a-fA-F]{2}$",
"capture": {1: "stop_signal_hex"}},
], True)
self.add_process_info_collection_packets()
# Run the stream sending the response..
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
# Gather process info response.
process_info = self.parse_process_info_response(context)
self.assertIsNotNone(process_info)
# Ensure the process id matches what we expected.
pid_text = process_info.get('pid', None)
self.assertIsNotNone(pid_text)
reported_pid = int(pid_text, base=16)
self.assertEqual(reported_pid, inferior.pid)
|