1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
import datetime
import logging
from django.utils.translation import gettext_lazy as _, ngettext as __
from debug_toolbar.panels import Panel
from debug_toolbar.utils import ThreadCollector
try:
import threading
except ImportError:
threading = None
MESSAGE_IF_STRING_REPRESENTATION_INVALID = "[Could not get log message]"
class LogCollector(ThreadCollector):
def collect(self, item, thread=None):
# Avoid logging SQL queries since they are already in the SQL panel
# TODO: Make this check whether SQL panel is enabled
if item.get("channel", "") == "django.db.backends":
return
super().collect(item, thread)
class ThreadTrackingHandler(logging.Handler):
def __init__(self, collector):
logging.Handler.__init__(self)
self.collector = collector
def emit(self, record):
try:
message = record.getMessage()
except Exception:
message = MESSAGE_IF_STRING_REPRESENTATION_INVALID
record = {
"message": message,
"time": datetime.datetime.fromtimestamp(record.created),
"level": record.levelname,
"file": record.pathname,
"line": record.lineno,
"channel": record.name,
}
self.collector.collect(record)
# We don't use enable/disable_instrumentation because logging is global.
# We can't add thread-local logging handlers. Hopefully logging is cheap.
collector = LogCollector()
logging_handler = ThreadTrackingHandler(collector)
logging.root.addHandler(logging_handler)
class LoggingPanel(Panel):
template = "debug_toolbar/panels/logging.html"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._records = {}
nav_title = _("Logging")
@property
def nav_subtitle(self):
stats = self.get_stats()
record_count = len(stats["records"]) if stats else None
return __("%(count)s message", "%(count)s messages", record_count) % {
"count": record_count
}
title = _("Log messages")
def process_request(self, request):
collector.clear_collection()
return super().process_request(request)
def generate_stats(self, request, response):
records = collector.get_collection()
self._records[threading.currentThread()] = records
collector.clear_collection()
self.record_stats({"records": records})
|