1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
from django.http import HttpResponseBadRequest, JsonResponse
from django.template.loader import render_to_string
from django.views.decorators.csrf import csrf_exempt
from debug_toolbar._compat import login_not_required
from debug_toolbar.decorators import render_with_toolbar_language, require_show_toolbar
from debug_toolbar.forms import SignedDataForm
from debug_toolbar.panels.sql.forms import SQLSelectForm
def get_signed_data(request):
"""Unpack a signed data form, if invalid returns None"""
data = request.GET if request.method == "GET" else request.POST
signed_form = SignedDataForm(data)
if signed_form.is_valid():
return signed_form.verified_data()
return None
@csrf_exempt
@login_not_required
@require_show_toolbar
@render_with_toolbar_language
def sql_select(request):
"""Returns the output of the SQL SELECT statement"""
verified_data = get_signed_data(request)
if not verified_data:
return HttpResponseBadRequest("Invalid signature")
form = SQLSelectForm(verified_data)
if form.is_valid():
sql = form.cleaned_data["raw_sql"]
params = form.cleaned_data["params"]
with form.cursor as cursor:
cursor.execute(sql, params)
headers = [d[0] for d in cursor.description]
result = cursor.fetchall()
context = {
"result": result,
"sql": form.reformat_sql(),
"duration": form.cleaned_data["duration"],
"headers": headers,
"alias": form.cleaned_data["alias"],
}
content = render_to_string("debug_toolbar/panels/sql_select.html", context)
return JsonResponse({"content": content})
return HttpResponseBadRequest("Form errors")
@csrf_exempt
@login_not_required
@require_show_toolbar
@render_with_toolbar_language
def sql_explain(request):
"""Returns the output of the SQL EXPLAIN on the given query"""
verified_data = get_signed_data(request)
if not verified_data:
return HttpResponseBadRequest("Invalid signature")
form = SQLSelectForm(verified_data)
if form.is_valid():
sql = form.cleaned_data["raw_sql"]
params = form.cleaned_data["params"]
vendor = form.connection.vendor
with form.cursor as cursor:
if vendor == "sqlite":
# SQLite's EXPLAIN dumps the low-level opcodes generated for a query;
# EXPLAIN QUERY PLAN dumps a more human-readable summary
# See https://www.sqlite.org/lang_explain.html for details
cursor.execute(f"EXPLAIN QUERY PLAN {sql}", params)
elif vendor == "postgresql":
cursor.execute(f"EXPLAIN ANALYZE {sql}", params)
else:
cursor.execute(f"EXPLAIN {sql}", params)
headers = [d[0] for d in cursor.description]
result = cursor.fetchall()
context = {
"result": result,
"sql": form.reformat_sql(),
"duration": form.cleaned_data["duration"],
"headers": headers,
"alias": form.cleaned_data["alias"],
}
content = render_to_string("debug_toolbar/panels/sql_explain.html", context)
return JsonResponse({"content": content})
return HttpResponseBadRequest("Form errors")
@csrf_exempt
@login_not_required
@require_show_toolbar
@render_with_toolbar_language
def sql_profile(request):
"""Returns the output of running the SQL and getting the profiling statistics"""
verified_data = get_signed_data(request)
if not verified_data:
return HttpResponseBadRequest("Invalid signature")
form = SQLSelectForm(verified_data)
if form.is_valid():
sql = form.cleaned_data["raw_sql"]
params = form.cleaned_data["params"]
result = None
headers = None
result_error = None
with form.cursor as cursor:
try:
cursor.execute("SET PROFILING=1") # Enable profiling
cursor.execute(sql, params) # Execute SELECT
cursor.execute("SET PROFILING=0") # Disable profiling
# The Query ID should always be 1 here but I'll subselect to get
# the last one just in case...
cursor.execute(
"""
SELECT *
FROM information_schema.profiling
WHERE query_id = (
SELECT query_id
FROM information_schema.profiling
ORDER BY query_id DESC
LIMIT 1
)
"""
)
headers = [d[0] for d in cursor.description]
result = cursor.fetchall()
except Exception:
result_error = (
"Profiling is either not available or not supported by your "
"database."
)
context = {
"result": result,
"result_error": result_error,
"sql": form.reformat_sql(),
"duration": form.cleaned_data["duration"],
"headers": headers,
"alias": form.cleaned_data["alias"],
}
content = render_to_string("debug_toolbar/panels/sql_profile.html", context)
return JsonResponse({"content": content})
return HttpResponseBadRequest("Form errors")
|