1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
|
# Copyright 2017 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Functions to instrument all Python function calls.
This generates a JSON file readable by Chrome's about:tracing. To use it,
either call start_instrumenting and stop_instrumenting at the appropriate times,
or use the Instrument context manager.
A function is only traced if it is from a Python module that matches at least
one regular expression object in to_include, and does not match any in
to_exclude. In between the start and stop events, every function call of a
function from such a module will be added to the trace.
"""
import contextlib
import functools
import inspect
import os
import re
import sys
import threading
from py_trace_event import trace_event
# Modules to exclude by default (to avoid problems like infinite loops)
DEFAULT_EXCLUDE = [r'py_trace_event\..*']
class _TraceArguments:
def __init__(self):
"""Wraps a dictionary to ensure safe evaluation of repr()."""
self._arguments = {}
@staticmethod
def _safeStringify(item):
try:
item_str = repr(item)
except Exception: # pylint: disable=broad-except
try:
item_str = str(item)
except Exception: # pylint: disable=broad-except
item_str = "<ERROR>"
return item_str
def add(self, key, val):
key_str = _TraceArguments._safeStringify(key)
val_str = _TraceArguments._safeStringify(val)
self._arguments[key_str] = val_str
def __repr__(self):
return repr(self._arguments)
saved_thread_ids = set()
def _shouldTrace(frame, to_include, to_exclude, included, excluded):
"""
Decides whether or not the function called in frame should be traced.
Args:
frame: The Python frame object of this function call.
to_include: Set of regex objects for modules which should be traced.
to_exclude: Set of regex objects for modules which should not be traced.
included: Set of module names we've determined should be traced.
excluded: Set of module names we've determined should not be traced.
"""
if not inspect.getmodule(frame):
return False
module_name = inspect.getmodule(frame).__name__
if module_name in included:
includes = True
elif to_include:
includes = any(pattern.match(module_name) for pattern in to_include)
else:
includes = True
if includes:
included.add(module_name)
else:
return False
# Find the modules of every function in the stack trace.
frames = inspect.getouterframes(frame)
calling_module_names = [inspect.getmodule(fr[0]).__name__ for fr in frames]
# Return False for anything with an excluded module's function anywhere in the
# stack trace (even if the function itself is in an included module).
if to_exclude:
for calling_module in calling_module_names:
if calling_module in excluded:
return False
for pattern in to_exclude:
if pattern.match(calling_module):
excluded.add(calling_module)
return False
return True
def _generate_trace_function(to_include, to_exclude):
to_include = {re.compile(item) for item in to_include}
to_exclude = {re.compile(item) for item in to_exclude}
to_exclude.update({re.compile(item) for item in DEFAULT_EXCLUDE})
included = set()
excluded = set()
tracing_pid = os.getpid()
def traceFunction(frame, event, arg):
del arg
# Don't try to trace in subprocesses.
if os.getpid() != tracing_pid:
sys.settrace(None)
return None
# pylint: disable=unused-argument
if event not in ("call", "return"):
return None
function_name = frame.f_code.co_name
filename = frame.f_code.co_filename
line_number = frame.f_lineno
if _shouldTrace(frame, to_include, to_exclude, included, excluded):
if event == "call":
# This function is beginning; we save the thread name (if that hasn't
# been done), record the Begin event, and return this function to be
# used as the local trace function.
thread_id = threading.current_thread().ident
if thread_id not in saved_thread_ids:
thread_name = threading.current_thread().name
trace_event.trace_set_thread_name(thread_name)
saved_thread_ids.add(thread_id)
arguments = _TraceArguments()
# The function's argument values are stored in the frame's
# |co_varnames| as the first |co_argcount| elements. (Following that
# are local variables.)
for idx in range(frame.f_code.co_argcount):
arg_name = frame.f_code.co_varnames[idx]
arguments.add(arg_name, frame.f_locals[arg_name])
trace_event.trace_begin(function_name, arguments=arguments,
module=inspect.getmodule(frame).__name__,
filename=filename, line_number=line_number)
# Return this function, so it gets used as the "local trace function"
# within this function's frame (and in particular, gets called for this
# function's "return" event).
return traceFunction
if event == "return":
trace_event.trace_end(function_name)
return None
return None
return traceFunction
def no_tracing(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
trace_func = sys.gettrace()
try:
sys.settrace(None)
threading.settrace(None)
return f(*args, **kwargs)
finally:
sys.settrace(trace_func)
threading.settrace(trace_func)
return wrapper
def start_instrumenting(output_file, to_include=(), to_exclude=()):
"""Enable tracing of all function calls (from specified modules)."""
trace_event.trace_enable(output_file)
traceFunc = _generate_trace_function(to_include, to_exclude)
sys.settrace(traceFunc)
threading.settrace(traceFunc)
def stop_instrumenting():
trace_event.trace_disable()
sys.settrace(None)
threading.settrace(None)
@contextlib.contextmanager
def Instrument(output_file, to_include=(), to_exclude=()):
try:
start_instrumenting(output_file, to_include, to_exclude)
yield None
finally:
stop_instrumenting()
|