1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
import json
from django import forms
from django.core.exceptions import ValidationError
from django.db import connections
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from debug_toolbar.panels.sql.utils import is_select_query, reformat_sql
from debug_toolbar.toolbar import DebugToolbar
class SQLSelectForm(forms.Form):
"""
Validate params
request_id: The identifier for the request
query_id: The identifier for the query
"""
request_id = forms.CharField()
djdt_query_id = forms.CharField()
def clean_raw_sql(self):
value = self.cleaned_data["raw_sql"]
if not is_select_query(value):
raise ValidationError("Only 'select' queries are allowed.")
return value
def clean_params(self):
value = self.cleaned_data["params"]
try:
return json.loads(value)
except ValueError as exc:
raise ValidationError("Is not valid JSON") from exc
def clean_alias(self):
value = self.cleaned_data["alias"]
if value not in connections:
raise ValidationError(f"Database alias '{value}' not found")
return value
def clean(self):
from debug_toolbar.panels.sql import SQLPanel
cleaned_data = super().clean()
toolbar = DebugToolbar.fetch(
self.cleaned_data["request_id"], panel_id=SQLPanel.panel_id
)
if toolbar is None:
raise ValidationError(_("Data for this panel isn't available anymore."))
panel = toolbar.get_panel_by_id(SQLPanel.panel_id)
# Find the query for this form submission
query = None
for q in panel.get_stats()["queries"]:
if q["djdt_query_id"] != self.cleaned_data["djdt_query_id"]:
continue
else:
query = q
break
if not query:
raise ValidationError(_("Invalid query id."))
cleaned_data["query"] = query
return cleaned_data
def select(self):
query = self.cleaned_data["query"]
sql = query["raw_sql"]
params = json.loads(query["params"])
with self.cursor as cursor:
cursor.execute(sql, params)
headers = [d[0] for d in cursor.description]
result = cursor.fetchall()
return result, headers
def explain(self):
query = self.cleaned_data["query"]
sql = query["raw_sql"]
params = json.loads(query["params"])
vendor = query["vendor"]
with self.cursor as cursor:
if vendor == "sqlite":
# SQLite's EXPLAIN dumps the low-level opcodes generated for a query;
# EXPLAIN QUERY PLAN dumps a more human-readable summary
# See https://www.sqlite.org/lang_explain.html for details
cursor.execute(f"EXPLAIN QUERY PLAN {sql}", params)
elif vendor == "postgresql":
cursor.execute(f"EXPLAIN ANALYZE {sql}", params)
else:
cursor.execute(f"EXPLAIN {sql}", params)
headers = [d[0] for d in cursor.description]
result = cursor.fetchall()
return result, headers
def profile(self):
query = self.cleaned_data["query"]
sql = query["raw_sql"]
params = json.loads(query["params"])
with self.cursor as cursor:
cursor.execute("SET PROFILING=1") # Enable profiling
cursor.execute(sql, params) # Execute SELECT
cursor.execute("SET PROFILING=0") # Disable profiling
# The Query ID should always be 1 here but I'll subselect to get
# the last one just in case...
cursor.execute(
"""
SELECT *
FROM information_schema.profiling
WHERE query_id = (
SELECT query_id
FROM information_schema.profiling
ORDER BY query_id DESC
LIMIT 1
)
"""
)
headers = [d[0] for d in cursor.description]
result = cursor.fetchall()
return result, headers
def reformat_sql(self):
return reformat_sql(self.cleaned_data["query"]["sql"], with_toggle=False)
@property
def connection(self):
return connections[self.cleaned_data["query"]["alias"]]
@cached_property
def cursor(self):
return self.connection.cursor()
|