1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
|
"""
Debug ORMAdapter calls within ORM runs.
Demos::
$ python tools/trace_orm_adapter.py -m pytest \
test/orm/inheritance/test_polymorphic_rel.py::PolymorphicAliasedJoinsTest::test_primary_eager_aliasing_joinedload
$ python tools/trace_orm_adapter.py -m pytest \
test/orm/test_eager_relations.py::LazyLoadOptSpecificityTest::test_pathed_joinedload_aliased_abs_bcs
$ python tools/trace_orm_adapter.py my_test_script.py
The above two tests should spit out a ton of debug output. If a test or program
has no debug output at all, that's a good thing! it means ORMAdapter isn't
used for that case.
You can then set a breakpoint at the end of any adapt step::
$ python tools/trace_orm_adapter.py -d 10 -m pytest -s \
test/orm/test_eager_relations.py::LazyLoadOptSpecificityTest::test_pathed_joinedload_aliased_abs_bcs
""" # noqa: E501
# mypy: ignore-errors
from __future__ import annotations
import argparse
import contextlib
import contextvars
import sys
from typing import TYPE_CHECKING
from sqlalchemy.orm import util
if TYPE_CHECKING:
from typing import Any
from typing import List
from typing import Optional
from sqlalchemy.sql.elements import ColumnElement
class _ORMAdapterTrace:
def _locate_col(
self, col: ColumnElement[Any]
) -> Optional[ColumnElement[Any]]:
with self._tracer("_locate_col") as tracer:
return tracer(super()._locate_col, col)
def replace(self, col, _include_singleton_constants: bool = False):
with self._tracer("replace") as tracer:
return tracer(super().replace, col)
_orm_adapter_trace_context = contextvars.ContextVar("_tracer")
@contextlib.contextmanager
def _tracer(self, meth):
adapter = self
ctx = self._orm_adapter_trace_context.get(
{"stack": [], "last_depth": 0, "line_no": 0}
)
self._orm_adapter_trace_context.set(ctx)
stack: List[Any] = ctx["stack"] # type: ignore
last_depth = len(stack)
line_no: int = ctx["line_no"] # type: ignore
ctx["last_depth"] = last_depth
stack.append((adapter, meth))
indent = " " * last_depth
if hasattr(adapter, "mapper"):
adapter_desc = (
f"{adapter.__class__.__name__}"
f"({adapter.role.name}, mapper={adapter.mapper})"
)
else:
adapter_desc = f"{adapter.__class__.__name__}({adapter.role.name})"
def tracer_fn(fn, arg):
nonlocal line_no
line_no += 1
print(f"{indent} {line_no} {adapter_desc}", file=REAL_STDOUT)
sub_indent = " " * len(f"{line_no} ")
print(
f"{indent}{sub_indent} -> "
f"{meth} {_orm_adapter_trace_print(arg)}",
file=REAL_STDOUT,
)
ctx["line_no"] = line_no
ret = fn(arg)
if DEBUG_ADAPT_STEP == line_no:
breakpoint()
if ret is arg:
print(f"{indent} {line_no} <- same object", file=REAL_STDOUT)
else:
print(
f"{indent} {line_no} <- {_orm_adapter_trace_print(ret)}",
file=REAL_STDOUT,
)
if last_depth == 0:
print("", file=REAL_STDOUT)
return ret
try:
yield tracer_fn
finally:
stack.pop(-1)
util.ORMAdapter.__bases__ = (_ORMAdapterTrace,) + util.ORMAdapter.__bases__
util.ORMStatementAdapter.__bases__ = (
_ORMAdapterTrace,
) + util.ORMStatementAdapter.__bases__
def _orm_adapter_trace_print(obj):
if obj is None:
return "None"
t_print = _orm_adapter_trace_printers.get(obj.__visit_name__, None)
if t_print:
return t_print(obj)
else:
return f"{obj!r}"
_orm_adapter_trace_printers = {
"table": lambda t: (
f'Table("{t.name}", '
f"entity={t._annotations.get('parentmapper', None)})"
),
"column": lambda c: (
f'Column("{c.name}", {_orm_adapter_trace_print(c.table)} '
f"entity={c._annotations.get('parentmapper', None)})"
),
"join": lambda j: (
f"{j.__class__.__name__}({_orm_adapter_trace_print(j.left)}, "
f"{_orm_adapter_trace_print(j.right)})"
),
"label": lambda l: f"Label({_orm_adapter_trace_print(l.element)})",
}
DEBUG_ADAPT_STEP = None
REAL_STDOUT = sys.__stdout__
def main():
global DEBUG_ADAPT_STEP
parser = argparse.ArgumentParser()
parser.add_argument(
"-d", "--debug", type=int, help="breakpoint at this adaptation step"
)
parser.add_argument(
"-m",
"--module",
type=str,
help="import module name instead of running a script",
)
parser.add_argument(
"args", metavar="N", type=str, nargs="*", help="additional arguments"
)
argparse_args = []
sys_argv = list(sys.argv)
progname = sys_argv.pop(0)
# this is a little crazy, works at the moment for:
# module w args:
# python tools/trace_orm_adapter.py -m pytest test/orm/test_query.py -s
# script:
# python tools/trace_orm_adapter.py test3.py
has_module = False
while sys_argv:
arg = sys_argv.pop(0)
if arg in ("-m", "--module", "-d", "--debug"):
argparse_args.append(arg)
argparse_args.append(sys_argv.pop(0))
has_module = arg in ("-m", "--module")
else:
if not has_module:
argparse_args.append(arg)
else:
sys_argv.insert(0, arg)
break
options = parser.parse_args(argparse_args)
sys.argv = ["program.py"] + sys_argv
if options.module == "pytest":
sys.argv.extend(["--capture", "sys"])
import runpy
if options.debug:
DEBUG_ADAPT_STEP = options.debug
if options.module:
runpy.run_module(options.module, run_name="__main__")
else:
progname = options.args[0]
runpy.run_path(progname)
if __name__ == "__main__":
main()
|