File: debug_cursor.py

package info (click to toggle)
python-django-extensions 4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,820 kB
  • sloc: python: 18,601; javascript: 7,354; makefile: 108; xml: 17
file content (144 lines) | stat: -rw-r--r-- 4,846 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# -*- coding: utf-8 -*-
import time
import traceback
from contextlib import contextmanager

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.db.backends import utils

from django_extensions.settings import DEFAULT_PRINT_SQL_TRUNCATE_CHARS


@contextmanager
def monkey_patch_cursordebugwrapper(
    print_sql=None,
    print_sql_location=False,
    truncate=None,
    logger=print,
    confprefix="DJANGO_EXTENSIONS",
):
    if not print_sql:
        yield
    else:
        if truncate is None:
            truncate = getattr(
                settings,
                "%s_PRINT_SQL_TRUNCATE" % confprefix,
                DEFAULT_PRINT_SQL_TRUNCATE_CHARS,
            )

        sqlparse = None
        if getattr(settings, "%s_SQLPARSE_ENABLED" % confprefix, True):
            try:
                import sqlparse

                sqlparse_format_kwargs_defaults = dict(
                    reindent_aligned=True,
                    truncate_strings=500,
                )
                sqlparse_format_kwargs = getattr(
                    settings,
                    "%s_SQLPARSE_FORMAT_KWARGS" % confprefix,
                    sqlparse_format_kwargs_defaults,
                )
            except ImportError:
                sqlparse = None

        pygments = None
        if getattr(settings, "%s_PYGMENTS_ENABLED" % confprefix, True):
            try:
                import pygments.lexers
                import pygments.formatters

                pygments_formatter = getattr(
                    settings,
                    "%s_PYGMENTS_FORMATTER" % confprefix,
                    pygments.formatters.TerminalFormatter,
                )
                pygments_formatter_kwargs = getattr(
                    settings, "%s_PYGMENTS_FORMATTER_KWARGS" % confprefix, {}
                )
            except ImportError:
                pass

        class PrintQueryWrapperMixin:
            def execute(self, sql, params=()):
                starttime = time.time()
                try:
                    return utils.CursorWrapper.execute(self, sql, params)
                finally:
                    execution_time = time.time() - starttime
                    raw_sql = self.db.ops.last_executed_query(self.cursor, sql, params)
                    if truncate:
                        raw_sql = raw_sql[:truncate]

                    if sqlparse:
                        raw_sql = sqlparse.format(raw_sql, **sqlparse_format_kwargs)

                    if pygments:
                        raw_sql = pygments.highlight(
                            raw_sql,
                            pygments.lexers.get_lexer_by_name("sql"),
                            pygments_formatter(**pygments_formatter_kwargs),
                        )

                    logger(raw_sql)
                    logger(
                        "Execution time: %.6fs [Database: %s]"
                        % (execution_time, self.db.alias)
                    )
                    if print_sql_location:
                        logger("Location of SQL Call:")
                        logger("".join(traceback.format_stack()))

        _CursorDebugWrapper = utils.CursorDebugWrapper

        class PrintCursorQueryWrapper(PrintQueryWrapperMixin, _CursorDebugWrapper):
            pass

        try:
            from django.db import connections

            _force_debug_cursor = {}
            for connection_name in connections:
                _force_debug_cursor[connection_name] = connections[
                    connection_name
                ].force_debug_cursor
        except Exception:
            connections = None

        utils.CursorDebugWrapper = PrintCursorQueryWrapper

        postgresql_base = None
        try:
            from django.db.backends.postgresql import base as postgresql_base

            _PostgreSQLCursorDebugWrapper = postgresql_base.CursorDebugWrapper

            class PostgreSQLPrintCursorDebugWrapper(
                PrintQueryWrapperMixin, _PostgreSQLCursorDebugWrapper
            ):
                pass
        except (ImproperlyConfigured, TypeError):
            postgresql_base = None

        if postgresql_base:
            postgresql_base.CursorDebugWrapper = PostgreSQLPrintCursorDebugWrapper

        if connections:
            for connection_name in connections:
                connections[connection_name].force_debug_cursor = True

        yield

        utils.CursorDebugWrapper = _CursorDebugWrapper

        if postgresql_base:
            postgresql_base.CursorDebugWrapper = _PostgreSQLCursorDebugWrapper

        if connections:
            for connection_name in connections:
                connections[connection_name].force_debug_cursor = _force_debug_cursor[
                    connection_name
                ]