1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
|
from collections import namedtuple
import dis
from functools import partial
import itertools
import os.path
import sys
from _pydevd_frame_eval.vendored import bytecode
from _pydevd_frame_eval.vendored.bytecode.instr import Instr, Label
from _pydev_bundle import pydev_log
from _pydevd_frame_eval.pydevd_frame_tracing import _pydev_stop_at_break, _pydev_needs_stop_at_break
DEBUG = False
class DebugHelper(object):
def __init__(self):
self._debug_dir = os.path.join(os.path.dirname(__file__), "debug_info")
try:
os.makedirs(self._debug_dir)
except:
pass
self._next = partial(next, itertools.count(0))
def _get_filename(self, op_number=None, prefix=""):
if op_number is None:
op_number = self._next()
name = "%03d_before.txt" % op_number
else:
name = "%03d_change.txt" % op_number
filename = os.path.join(self._debug_dir, prefix + name)
return filename, op_number
def write_bytecode(self, b, op_number=None, prefix=""):
filename, op_number = self._get_filename(op_number, prefix)
with open(filename, "w") as stream:
bytecode.dump_bytecode(b, stream=stream, lineno=True)
return op_number
def write_dis(self, code_to_modify, op_number=None, prefix=""):
filename, op_number = self._get_filename(op_number, prefix)
with open(filename, "w") as stream:
stream.write("-------- ")
stream.write("-------- ")
stream.write("id(code_to_modify): %s" % id(code_to_modify))
stream.write("\n\n")
dis.dis(code_to_modify, file=stream)
return op_number
_CodeLineInfo = namedtuple("_CodeLineInfo", "line_to_offset, first_line, last_line")
# Note: this method has a version in cython too (that one is usually used, this is just for tests).
def _get_code_line_info(code_obj):
line_to_offset = {}
first_line = None
last_line = None
for offset, line in dis.findlinestarts(code_obj):
if line is not None:
line_to_offset[line] = offset
if line_to_offset:
first_line = min(line_to_offset)
last_line = max(line_to_offset)
return _CodeLineInfo(line_to_offset, first_line, last_line)
if DEBUG:
debug_helper = DebugHelper()
def get_instructions_to_add(stop_at_line, _pydev_stop_at_break=_pydev_stop_at_break, _pydev_needs_stop_at_break=_pydev_needs_stop_at_break):
"""
This is the bytecode for something as:
if _pydev_needs_stop_at_break():
_pydev_stop_at_break()
but with some special handling for lines.
"""
# Good reference to how things work regarding line numbers and jumps:
# https://github.com/python/cpython/blob/3.6/Objects/lnotab_notes.txt
# Usually use a stop line -1, but if that'd be 0, using line +1 is ok too.
spurious_line = stop_at_line - 1
if spurious_line <= 0:
spurious_line = stop_at_line + 1
label = Label()
return [
# -- if _pydev_needs_stop_at_break():
Instr("LOAD_CONST", _pydev_needs_stop_at_break, lineno=stop_at_line),
Instr("LOAD_CONST", stop_at_line, lineno=stop_at_line),
Instr("CALL_FUNCTION", 1, lineno=stop_at_line),
Instr("POP_JUMP_IF_FALSE", label, lineno=stop_at_line),
# -- _pydev_stop_at_break()
#
# Note that this has line numbers -1 so that when the NOP just below
# is executed we have a spurious line event.
Instr("LOAD_CONST", _pydev_stop_at_break, lineno=spurious_line),
Instr("LOAD_CONST", stop_at_line, lineno=spurious_line),
Instr("CALL_FUNCTION", 1, lineno=spurious_line),
Instr("POP_TOP", lineno=spurious_line),
# Reason for the NOP: Python will give us a 'line' trace event whenever we forward jump to
# the first instruction of a line, so, in the case where we haven't added a programmatic
# breakpoint (either because we didn't hit a breakpoint anymore or because it was already
# tracing), we don't want the spurious line event due to the line change, so, we make a jump
# to the instruction right after the NOP so that the spurious line event is NOT generated in
# this case (otherwise we'd have a line event even if the line didn't change).
Instr("NOP", lineno=stop_at_line),
label,
]
class _Node(object):
def __init__(self, data):
self.prev = None
self.next = None
self.data = data
def append(self, data):
node = _Node(data)
curr_next = self.next
node.next = self.next
node.prev = self
self.next = node
if curr_next is not None:
curr_next.prev = node
return node
def prepend(self, data):
node = _Node(data)
curr_prev = self.prev
node.prev = self.prev
node.next = self
self.prev = node
if curr_prev is not None:
curr_prev.next = node
return node
class _HelperBytecodeList(object):
"""
A helper double-linked list to make the manipulation a bit easier (so that we don't need
to keep track of indices that change) and performant (because adding multiple items to
the middle of a regular list isn't ideal).
"""
def __init__(self, lst=None):
self._head = None
self._tail = None
if lst:
node = self
for item in lst:
node = node.append(item)
def append(self, data):
if self._tail is None:
node = _Node(data)
self._head = self._tail = node
return node
else:
node = self._tail = self.tail.append(data)
return node
@property
def head(self):
node = self._head
# Manipulating the node directly may make it unsynchronized.
while node.prev:
self._head = node = node.prev
return node
@property
def tail(self):
node = self._tail
# Manipulating the node directly may make it unsynchronized.
while node.next:
self._tail = node = node.next
return node
def __iter__(self):
node = self.head
while node:
yield node.data
node = node.next
_PREDICT_TABLE = {
"LIST_APPEND": ("JUMP_ABSOLUTE",),
"SET_ADD": ("JUMP_ABSOLUTE",),
"GET_ANEXT": ("LOAD_CONST",),
"GET_AWAITABLE": ("LOAD_CONST",),
"DICT_MERGE": ("CALL_FUNCTION_EX",),
"MAP_ADD": ("JUMP_ABSOLUTE",),
"COMPARE_OP": (
"POP_JUMP_IF_FALSE",
"POP_JUMP_IF_TRUE",
),
"IS_OP": (
"POP_JUMP_IF_FALSE",
"POP_JUMP_IF_TRUE",
),
"CONTAINS_OP": (
"POP_JUMP_IF_FALSE",
"POP_JUMP_IF_TRUE",
),
# Note: there are some others with PREDICT on ceval, but they have more logic
# and it needs more experimentation to know how it behaves in the static generated
# code (and it's only an issue for us if there's actually a line change between
# those, so, we don't have to really handle all the cases, only the one where
# the line number actually changes from one instruction to the predicted one).
}
# 3.10 optimizations include copying code branches multiple times (for instance
# if the body of a finally has a single assign statement it can copy the assign to the case
# where an exception happens and doesn't happen for optimization purposes) and as such
# we need to add the programmatic breakpoint multiple times.
TRACK_MULTIPLE_BRANCHES = sys.version_info[:2] >= (3, 10)
# When tracking multiple branches, we try to fix the bytecodes which would be PREDICTED in the
# Python eval loop so that we don't have spurious line events that wouldn't usually be issued
# in the tracing as they're ignored due to the eval prediction (even though they're in the bytecode).
FIX_PREDICT = sys.version_info[:2] >= (3, 10)
def insert_pydevd_breaks(
code_to_modify,
breakpoint_lines,
code_line_info=None,
_pydev_stop_at_break=_pydev_stop_at_break,
_pydev_needs_stop_at_break=_pydev_needs_stop_at_break,
):
"""
Inserts pydevd programmatic breaks into the code (at the given lines).
:param breakpoint_lines: set with the lines where we should add breakpoints.
:return: tuple(boolean flag whether insertion was successful, modified code).
"""
if code_line_info is None:
code_line_info = _get_code_line_info(code_to_modify)
if not code_line_info.line_to_offset:
return False, code_to_modify
# Create a copy (and make sure we're dealing with a set).
breakpoint_lines = set(breakpoint_lines)
# Note that we can even generate breakpoints on the first line of code
# now, since we generate a spurious line event -- it may be a bit pointless
# as we'll stop in the first line and we don't currently stop the tracing after the
# user resumes, but in the future, if we do that, this would be a nice
# improvement.
# if code_to_modify.co_firstlineno in breakpoint_lines:
# return False, code_to_modify
for line in breakpoint_lines:
if line <= 0:
# The first line is line 1, so, a break at line 0 is not valid.
pydev_log.info("Trying to add breakpoint in invalid line: %s", line)
return False, code_to_modify
try:
b = bytecode.Bytecode.from_code(code_to_modify)
if DEBUG:
op_number_bytecode = debug_helper.write_bytecode(b, prefix="bytecode.")
helper_list = _HelperBytecodeList(b)
modified_breakpoint_lines = breakpoint_lines.copy()
curr_node = helper_list.head
added_breaks_in_lines = set()
last_lineno = None
while curr_node is not None:
instruction = curr_node.data
instruction_lineno = getattr(instruction, "lineno", None)
curr_name = getattr(instruction, "name", None)
if FIX_PREDICT:
predict_targets = _PREDICT_TABLE.get(curr_name)
if predict_targets:
# Odd case: the next instruction may have a line number but it doesn't really
# appear in the tracing due to the PREDICT() in ceval, so, fix the bytecode so
# that it does things the way that ceval actually interprets it.
# See: https://mail.python.org/archives/list/python-dev@python.org/thread/CP2PTFCMTK57KM3M3DLJNWGO66R5RVPB/
next_instruction = curr_node.next.data
next_name = getattr(next_instruction, "name", None)
if next_name in predict_targets:
next_instruction_lineno = getattr(next_instruction, "lineno", None)
if next_instruction_lineno:
next_instruction.lineno = None
if instruction_lineno is not None:
if TRACK_MULTIPLE_BRANCHES:
if last_lineno is None:
last_lineno = instruction_lineno
else:
if last_lineno == instruction_lineno:
# If the previous is a label, someone may jump into it, so, we need to add
# the break even if it's in the same line.
if curr_node.prev.data.__class__ != Label:
# Skip adding this as the line is still the same.
curr_node = curr_node.next
continue
last_lineno = instruction_lineno
else:
if instruction_lineno in added_breaks_in_lines:
curr_node = curr_node.next
continue
if instruction_lineno in modified_breakpoint_lines:
added_breaks_in_lines.add(instruction_lineno)
if curr_node.prev is not None and curr_node.prev.data.__class__ == Label and curr_name == "POP_TOP":
# If we have a SETUP_FINALLY where the target is a POP_TOP, we can't change
# the target to be the breakpoint instruction (this can crash the interpreter).
for new_instruction in get_instructions_to_add(
instruction_lineno,
_pydev_stop_at_break=_pydev_stop_at_break,
_pydev_needs_stop_at_break=_pydev_needs_stop_at_break,
):
curr_node = curr_node.append(new_instruction)
else:
for new_instruction in get_instructions_to_add(
instruction_lineno,
_pydev_stop_at_break=_pydev_stop_at_break,
_pydev_needs_stop_at_break=_pydev_needs_stop_at_break,
):
curr_node.prepend(new_instruction)
curr_node = curr_node.next
b[:] = helper_list
if DEBUG:
debug_helper.write_bytecode(b, op_number_bytecode, prefix="bytecode.")
new_code = b.to_code()
except:
pydev_log.exception("Error inserting pydevd breaks.")
return False, code_to_modify
if DEBUG:
op_number = debug_helper.write_dis(code_to_modify)
debug_helper.write_dis(new_code, op_number)
return True, new_code
|