1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
# -*- coding: utf-8 -*-
import time
import traceback
from contextlib import contextmanager
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.db.backends import utils
from django_extensions.settings import DEFAULT_PRINT_SQL_TRUNCATE_CHARS
@contextmanager
def monkey_patch_cursordebugwrapper(
print_sql=None,
print_sql_location=False,
truncate=None,
logger=print,
confprefix="DJANGO_EXTENSIONS",
):
if not print_sql:
yield
else:
if truncate is None:
truncate = getattr(
settings,
"%s_PRINT_SQL_TRUNCATE" % confprefix,
DEFAULT_PRINT_SQL_TRUNCATE_CHARS,
)
sqlparse = None
if getattr(settings, "%s_SQLPARSE_ENABLED" % confprefix, True):
try:
import sqlparse
sqlparse_format_kwargs_defaults = dict(
reindent_aligned=True,
truncate_strings=500,
)
sqlparse_format_kwargs = getattr(
settings,
"%s_SQLPARSE_FORMAT_KWARGS" % confprefix,
sqlparse_format_kwargs_defaults,
)
except ImportError:
sqlparse = None
pygments = None
if getattr(settings, "%s_PYGMENTS_ENABLED" % confprefix, True):
try:
import pygments.lexers
import pygments.formatters
pygments_formatter = getattr(
settings,
"%s_PYGMENTS_FORMATTER" % confprefix,
pygments.formatters.TerminalFormatter,
)
pygments_formatter_kwargs = getattr(
settings, "%s_PYGMENTS_FORMATTER_KWARGS" % confprefix, {}
)
except ImportError:
pass
class PrintQueryWrapperMixin:
def execute(self, sql, params=()):
starttime = time.time()
try:
return utils.CursorWrapper.execute(self, sql, params)
finally:
execution_time = time.time() - starttime
raw_sql = self.db.ops.last_executed_query(self.cursor, sql, params)
if truncate:
raw_sql = raw_sql[:truncate]
if sqlparse:
raw_sql = sqlparse.format(raw_sql, **sqlparse_format_kwargs)
if pygments:
raw_sql = pygments.highlight(
raw_sql,
pygments.lexers.get_lexer_by_name("sql"),
pygments_formatter(**pygments_formatter_kwargs),
)
logger(raw_sql)
logger(
"Execution time: %.6fs [Database: %s]"
% (execution_time, self.db.alias)
)
if print_sql_location:
logger("Location of SQL Call:")
logger("".join(traceback.format_stack()))
_CursorDebugWrapper = utils.CursorDebugWrapper
class PrintCursorQueryWrapper(PrintQueryWrapperMixin, _CursorDebugWrapper):
pass
try:
from django.db import connections
_force_debug_cursor = {}
for connection_name in connections:
_force_debug_cursor[connection_name] = connections[
connection_name
].force_debug_cursor
except Exception:
connections = None
utils.CursorDebugWrapper = PrintCursorQueryWrapper
postgresql_base = None
try:
from django.db.backends.postgresql import base as postgresql_base
_PostgreSQLCursorDebugWrapper = postgresql_base.CursorDebugWrapper
class PostgreSQLPrintCursorDebugWrapper(
PrintQueryWrapperMixin, _PostgreSQLCursorDebugWrapper
):
pass
except (ImproperlyConfigured, TypeError):
postgresql_base = None
if postgresql_base:
postgresql_base.CursorDebugWrapper = PostgreSQLPrintCursorDebugWrapper
if connections:
for connection_name in connections:
connections[connection_name].force_debug_cursor = True
yield
utils.CursorDebugWrapper = _CursorDebugWrapper
if postgresql_base:
postgresql_base.CursorDebugWrapper = _PostgreSQLCursorDebugWrapper
if connections:
for connection_name in connections:
connections[connection_name].force_debug_cursor = _force_debug_cursor[
connection_name
]
|