1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
|
from __future__ import annotations
import functools
import inspect
import sys
import typing
from pyinstrument.profiler import AsyncMode, Profiler
from pyinstrument.renderers.base import Renderer
from pyinstrument.renderers.console import ConsoleRenderer
from pyinstrument.typing import Unpack
from pyinstrument.util import file_supports_color, file_supports_unicode
CallableVar = typing.TypeVar("CallableVar", bound=typing.Callable)
class ProfileContextOptions(typing.TypedDict, total=False):
interval: float
async_mode: AsyncMode
use_timing_thread: bool | None
renderer: Renderer | None
target_description: str | None
class ProfileContext:
options: ProfileContextOptions
def __init__(
self,
**kwargs: Unpack[ProfileContextOptions],
):
profiler_options = {
"interval": kwargs.get("interval", 0.001),
# note- different async mode from the default, because it's easy
# to run multiple profilers at once using the decorator/context
# manager
"async_mode": kwargs.get("async_mode", "disabled"),
"use_timing_thread": kwargs.get("use_timing_thread", None),
}
self.profiler = Profiler(**profiler_options)
self.options = kwargs
@typing.overload
def __call__(self, func: CallableVar, /) -> CallableVar: ...
@typing.overload
def __call__(self, /, **kwargs: Unpack[ProfileContextOptions]) -> "ProfileContext": ...
def __call__(
self, func: typing.Callable | None = None, /, **kwargs: Unpack[ProfileContextOptions]
):
if func is not None:
@functools.wraps(func)
def wrapper(*args, **kwargs):
target_description = self.options.get("target_description")
if target_description is None:
target_description = f"Function {func.__qualname__} at {func.__code__.co_filename}:{func.__code__.co_firstlineno}"
with self(target_description=target_description):
return func(*args, **kwargs)
return typing.cast(typing.Callable, wrapper)
else:
return ProfileContext(**{**self.options, **kwargs})
def __enter__(self):
if self.profiler.is_running:
raise RuntimeError(
"This profiler is already running - did you forget the brackets on pyinstrument.profile() ?"
)
caller_frame = inspect.currentframe().f_back # type: ignore
assert caller_frame is not None
target_description = self.options.get("target_description")
if target_description is None:
target_description = "Block at {}:{}".format(
caller_frame.f_code.co_filename, caller_frame.f_lineno
)
self.profiler.start(
caller_frame=caller_frame,
target_description=target_description,
)
def __exit__(self, exc_type, exc_value, traceback):
session = self.profiler.stop()
renderer = self.options.get("renderer")
f = sys.stderr
if renderer is None:
renderer = ConsoleRenderer(
color=file_supports_color(f),
unicode=file_supports_unicode(f),
short_mode=True,
)
f.write(renderer.render(session))
class _Profile:
@typing.overload
def __call__(self, func: CallableVar, /) -> CallableVar: ...
@typing.overload
def __call__(self, /, **kwargs: Unpack[ProfileContextOptions]) -> "ProfileContext": ...
def __call__(
self, func: typing.Callable | None = None, /, **kwargs: Unpack[ProfileContextOptions]
):
if func is not None:
return ProfileContext(**kwargs)(func)
else:
return ProfileContext(**kwargs)
profile = _Profile()
|