1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
|
"""Use the influxdb_client with python native logging."""
import logging
from influxdb_client import InfluxDBClient
class InfluxLoggingHandler(logging.Handler):
"""
InfluxLoggingHandler instances dispatch logging events to influx.
There is no need to set a Formatter.
The raw input will be passed on to the influx write api.
"""
DEFAULT_LOG_RECORD_KEYS = list(logging.makeLogRecord({}).__dict__.keys()) + ['message']
def __init__(self, *, url, token, org, bucket, client_args=None, write_api_args=None):
"""
Initialize defaults.
The arguments `client_args` and `write_api_args` can be dicts of kwargs.
They are passed on to the InfluxDBClient and write_api calls respectively.
"""
super().__init__()
self.bucket = bucket
client_args = {} if client_args is None else client_args
self.client = InfluxDBClient(url=url, token=token, org=org, **client_args)
write_api_args = {} if write_api_args is None else write_api_args
self.write_api = self.client.write_api(**write_api_args)
def __del__(self):
"""Make sure all resources are closed."""
self.close()
def close(self) -> None:
"""Close the write_api, client and logger."""
self.write_api.close()
self.client.close()
super().close()
def emit(self, record: logging.LogRecord) -> None:
"""Emit a record via the influxDB WriteApi."""
try:
message = self.format(record)
extra = self._get_extra_values(record)
return self.write_api.write(record=message, **extra)
except (KeyboardInterrupt, SystemExit):
raise
except (Exception,):
self.handleError(record)
def _get_extra_values(self, record: logging.LogRecord) -> dict:
"""
Extract all items from the record that were injected via extra.
Example: `logging.debug(msg, extra={key: value, ...})`.
"""
extra = {'bucket': self.bucket}
extra.update({key: value for key, value in record.__dict__.items()
if key not in self.DEFAULT_LOG_RECORD_KEYS})
return extra
|