1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
from __future__ import annotations
import marshal
from typing import Any, Dict, Tuple
from pyinstrument import processors
from pyinstrument.frame import Frame
from pyinstrument.renderers.base import FrameRenderer, ProcessorList
from pyinstrument.session import Session
# pyright: strict
FrameKey = Tuple[str, int, str]
CallerValue = Tuple[float, int, float, float]
FrameValue = Tuple[float, int, float, float, Dict[FrameKey, CallerValue]]
StatsDict = Dict[FrameKey, FrameValue]
class PstatsRenderer(FrameRenderer):
"""
Outputs a marshaled dict, containing processed frames in pstat format,
suitable for processing by gprof2dot and snakeviz.
"""
output_file_extension = "pstats"
output_is_binary = True
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
def frame_key(self, frame: Frame) -> FrameKey:
return (frame.file_path or "", frame.line_no or 0, frame.function)
def render_frame(self, frame: Frame | None, stats: StatsDict) -> None:
if frame is None:
return
key = self.frame_key(frame)
if key not in stats:
# create a new entry
# being a statistical profiler, we don't know the exact call time or
# number of calls, they're stubbed out
call_time = -1
number_calls = -1
total_time = 0
cumulative_time = 0
callers: dict[FrameKey, CallerValue] = {}
else:
call_time, number_calls, total_time, cumulative_time, callers = stats[key]
# update the total time and cumulative time
total_time += frame.total_self_time
cumulative_time += frame.time
if frame.parent:
parent_key = self.frame_key(frame.parent)
if parent_key not in callers:
p_call_time = -1
p_number_calls = -1
p_total_time = 0
p_cumulative_time = 0
else:
p_call_time, p_number_calls, p_total_time, p_cumulative_time = callers[parent_key]
p_total_time += frame.total_self_time
p_cumulative_time += frame.time
callers[parent_key] = p_call_time, p_number_calls, p_total_time, p_cumulative_time
stats[key] = (call_time, number_calls, total_time, cumulative_time, callers)
for child in frame.children:
if not child.is_synthetic:
self.render_frame(child, stats)
def render(self, session: Session):
frame = self.preprocess(session.root_frame())
stats: StatsDict = {}
self.render_frame(frame, stats)
# marshal.dumps returns bytes, so we need to decode it to a string
# using surrogateescape
return marshal.dumps(stats).decode(encoding="utf-8", errors="surrogateescape")
def default_processors(self) -> ProcessorList:
return [
processors.remove_importlib,
processors.remove_tracebackhide,
processors.merge_consecutive_self_time,
processors.aggregate_repeated_calls,
processors.remove_irrelevant_nodes,
processors.remove_unnecessary_self_time_nodes,
processors.remove_first_pyinstrument_frames_processor,
]
|