1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
|
"""
Processors are functions that take a Frame object, and mutate the tree to perform some task.
They can mutate the tree in-place, but also can change the root frame, they should always be
called like::
frame = processor(frame, options=...)
"""
from __future__ import annotations
import re
from typing import Any, Callable, Dict, Union
from pyinstrument.frame import SELF_TIME_FRAME_IDENTIFIER, Frame, FrameGroup
from pyinstrument.frame_ops import combine_frames, delete_frame_from_tree
# pyright: strict
ProcessorType = Callable[..., Union[Frame, None]]
ProcessorOptions = Dict[str, Any]
def remove_importlib(frame: Frame | None, options: ProcessorOptions) -> Frame | None:
"""
Removes ``<frozen importlib._bootstrap`` frames that clutter the output.
"""
if frame is None:
return None
for child in frame.children:
remove_importlib(child, options=options)
if child.file_path and "<frozen importlib._bootstrap" in child.file_path:
delete_frame_from_tree(child, replace_with="children")
return frame
def remove_tracebackhide(frame: Frame | None, options: ProcessorOptions) -> Frame | None:
"""
Removes frames that have set a local `__tracebackhide__` (e.g.
`__tracebackhide__ = True`), to hide them from the output.
"""
if frame is None:
return None
for child in frame.children:
remove_tracebackhide(child, options=options)
if child.has_tracebackhide:
# remove this node, moving the self_time and children up to the parent
delete_frame_from_tree(child, replace_with="children")
return frame
def aggregate_repeated_calls(frame: Frame | None, options: ProcessorOptions) -> Frame | None:
"""
Converts a timeline into a time-aggregate summary.
Adds together calls along the same call stack, so that repeated calls appear as the same
frame. Removes time-linearity - frames are sorted according to total time spent.
Useful for outputs that display a summary of execution (e.g. text and html outputs)
"""
if frame is None:
return None
children_by_identifier: dict[str, Frame] = {}
# iterate over a copy of the children since it's going to mutate while we're iterating
for child in frame.children:
if child.identifier in children_by_identifier:
aggregate_frame = children_by_identifier[child.identifier]
# combine child into aggregate frame, removing it from the tree
combine_frames(child, into=aggregate_frame)
else:
# never seen this identifier before. It becomes the aggregate frame.
children_by_identifier[child.identifier] = child
# recurse into the children
for child in frame.children:
aggregate_repeated_calls(child, options=options)
# sort the children by time
# we use the internal _children list, because we need to mutate it
frame._children.sort(key=lambda c: c.time, reverse=True) # type: ignore # noqa
return frame
def group_library_frames_processor(frame: Frame | None, options: ProcessorOptions) -> Frame | None:
"""
Groups frames that should be hidden into :class:`FrameGroup` objects,
according to ``hide_regex`` and ``show_regex`` in the options dict, as
applied to the file path of the source code of the frame. If both match,
'show' has precedence.
Options:
``hide_regex``
regular expression, which if matches the file path, hides the frame in a
frame group.
``show_regex``
regular expression, which if matches the file path, ensures the frame is
not hidden
Single frames are not grouped, there must be at least two frames in a
group.
"""
if frame is None:
return None
hide_regex: str | None = options.get("hide_regex")
show_regex: str | None = options.get("show_regex")
def should_be_hidden(frame: Frame):
frame_file_path = frame.file_path or ""
should_show = (show_regex is not None) and re.match(show_regex, frame_file_path)
should_hide = (hide_regex is not None) and re.match(hide_regex, frame_file_path)
# check for explicit user show/hide rules. 'show' has precedence.
if should_show:
return False
if should_hide:
return True
return not frame.is_application_code
def add_frames_to_group(frame: Frame, group: FrameGroup):
group.add_frame(frame)
for child in frame.children:
if should_be_hidden(child):
add_frames_to_group(child, group)
for child in frame.children:
if not child.group and (
should_be_hidden(child) and any(should_be_hidden(cc) for cc in child.children)
):
group = FrameGroup(child)
add_frames_to_group(child, group)
group_library_frames_processor(child, options=options)
return frame
def merge_consecutive_self_time(
frame: Frame | None, options: ProcessorOptions, recursive: bool = True
) -> Frame | None:
"""
Combines consecutive 'self time' frames.
"""
if frame is None:
return None
previous_self_time_frame = None
for child in frame.children:
if child.identifier == SELF_TIME_FRAME_IDENTIFIER:
if previous_self_time_frame:
# merge
previous_self_time_frame.time += child.time
child.remove_from_parent()
else:
# keep a reference, maybe it'll be added to on the next loop
previous_self_time_frame = child
else:
previous_self_time_frame = None
if recursive:
for child in frame.children:
merge_consecutive_self_time(child, options=options, recursive=True)
return frame
def remove_unnecessary_self_time_nodes(
frame: Frame | None, options: ProcessorOptions
) -> Frame | None:
"""
When a frame has only one child, and that is a self-time frame, remove
that node and move the time to parent, since it's unnecessary - it
clutters the output and offers no additional information.
"""
if frame is None:
return None
if len(frame.children) == 1 and frame.children[0].identifier == SELF_TIME_FRAME_IDENTIFIER:
delete_frame_from_tree(frame.children[0], replace_with="nothing")
for child in frame.children:
remove_unnecessary_self_time_nodes(child, options=options)
return frame
def remove_irrelevant_nodes(
frame: Frame | None, options: ProcessorOptions, total_time: float | None = None
) -> Frame | None:
"""
Remove nodes that represent less than e.g. 1% of the output. Options:
``filter_threshold``
sets the minimum duration of a frame to be included in the output.
Default: 0.01.
"""
if frame is None:
return None
if total_time is None:
total_time = frame.time
# prevent divide by zero
if total_time <= 0:
total_time = 1e-44
filter_threshold = options.get("filter_threshold", 0.01)
for child in frame.children:
proportion_of_total = child.time / total_time
if proportion_of_total < filter_threshold:
delete_frame_from_tree(child, replace_with="nothing")
for child in frame.children:
remove_irrelevant_nodes(child, options=options, total_time=total_time)
return frame
# pylint: disable=W0613
def remove_first_pyinstrument_frames_processor(
frame: Frame | None, options: ProcessorOptions
) -> Frame | None:
"""
The first few frames when using the command line are the __main__ of
pyinstrument, the eval, and the 'runpy' module. I want to remove that from
the output.
"""
if frame is None:
return None
# the initial pyinstrument frame
def is_initial_pyinstrument_frame(frame: Frame):
return (
frame.file_path is not None
and re.match(r".*pyinstrument[/\\]__main__.py", frame.file_path)
and len(frame.children) > 0
)
def is_exec_frame(frame: Frame):
return (
frame.proportion_of_parent > 0.8
and frame.file_path is not None
and "<string>" in frame.file_path
and len(frame.children) > 0
)
def is_runpy_frame(frame: Frame):
return (
frame.proportion_of_parent > 0.8
and frame.file_path is not None
and (re.match(r".*runpy.py", frame.file_path) or "<frozen runpy>" in frame.file_path)
and len(frame.children) > 0
)
result = frame
if not is_initial_pyinstrument_frame(result):
return frame
result = result.children[0]
if not is_exec_frame(result):
return frame
result = result.children[0]
# at this point we know we've matched the first few frames of a command
# line invocation. We'll trim some runpy frames and return.
while is_runpy_frame(result):
result = result.children[0]
# remove this frame from the parent to make it the new root frame
result.remove_from_parent()
return result
|