File: forms.py

package info (click to toggle)
python-django-debug-toolbar 1%3A6.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,052 kB
  • sloc: python: 7,555; javascript: 636; makefile: 67; sh: 16
file content (136 lines) | stat: -rw-r--r-- 4,569 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import json

from django import forms
from django.core.exceptions import ValidationError
from django.db import connections
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _

from debug_toolbar.panels.sql.utils import is_select_query, reformat_sql
from debug_toolbar.toolbar import DebugToolbar


class SQLSelectForm(forms.Form):
    """
    Validate params

        request_id: The identifier for the request
        query_id: The identifier for the query
    """

    request_id = forms.CharField()
    djdt_query_id = forms.CharField()

    def clean_raw_sql(self):
        value = self.cleaned_data["raw_sql"]

        if not is_select_query(value):
            raise ValidationError("Only 'select' queries are allowed.")

        return value

    def clean_params(self):
        value = self.cleaned_data["params"]

        try:
            return json.loads(value)
        except ValueError as exc:
            raise ValidationError("Is not valid JSON") from exc

    def clean_alias(self):
        value = self.cleaned_data["alias"]

        if value not in connections:
            raise ValidationError(f"Database alias '{value}' not found")

        return value

    def clean(self):
        from debug_toolbar.panels.sql import SQLPanel

        cleaned_data = super().clean()
        toolbar = DebugToolbar.fetch(
            self.cleaned_data["request_id"], panel_id=SQLPanel.panel_id
        )
        if toolbar is None:
            raise ValidationError(_("Data for this panel isn't available anymore."))

        panel = toolbar.get_panel_by_id(SQLPanel.panel_id)
        # Find the query for this form submission
        query = None
        for q in panel.get_stats()["queries"]:
            if q["djdt_query_id"] != self.cleaned_data["djdt_query_id"]:
                continue
            else:
                query = q
                break
        if not query:
            raise ValidationError(_("Invalid query id."))
        cleaned_data["query"] = query
        return cleaned_data

    def select(self):
        query = self.cleaned_data["query"]
        sql = query["raw_sql"]
        params = json.loads(query["params"])
        with self.cursor as cursor:
            cursor.execute(sql, params)
            headers = [d[0] for d in cursor.description]
            result = cursor.fetchall()
            return result, headers

    def explain(self):
        query = self.cleaned_data["query"]
        sql = query["raw_sql"]
        params = json.loads(query["params"])
        vendor = query["vendor"]
        with self.cursor as cursor:
            if vendor == "sqlite":
                # SQLite's EXPLAIN dumps the low-level opcodes generated for a query;
                # EXPLAIN QUERY PLAN dumps a more human-readable summary
                # See https://www.sqlite.org/lang_explain.html for details
                cursor.execute(f"EXPLAIN QUERY PLAN {sql}", params)
            elif vendor == "postgresql":
                cursor.execute(f"EXPLAIN ANALYZE {sql}", params)
            else:
                cursor.execute(f"EXPLAIN {sql}", params)
            headers = [d[0] for d in cursor.description]
            result = cursor.fetchall()
            return result, headers

    def profile(self):
        query = self.cleaned_data["query"]
        sql = query["raw_sql"]
        params = json.loads(query["params"])
        with self.cursor as cursor:
            cursor.execute("SET PROFILING=1")  # Enable profiling
            cursor.execute(sql, params)  # Execute SELECT
            cursor.execute("SET PROFILING=0")  # Disable profiling
            # The Query ID should always be 1 here but I'll subselect to get
            # the last one just in case...
            cursor.execute(
                """
                SELECT *
                FROM information_schema.profiling
                WHERE query_id = (
                    SELECT query_id
                    FROM information_schema.profiling
                    ORDER BY query_id DESC
                    LIMIT 1
                )
                """
            )
            headers = [d[0] for d in cursor.description]
            result = cursor.fetchall()
            return result, headers

    def reformat_sql(self):
        return reformat_sql(self.cleaned_data["query"]["sql"], with_toggle=False)

    @property
    def connection(self):
        return connections[self.cleaned_data["query"]["alias"]]

    @cached_property
    def cursor(self):
        return self.connection.cursor()