File: LoggingHandler.py

package info (click to toggle)
python-applicationinsights 0.11.10-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 876 kB
  • sloc: python: 5,948; makefile: 151; sh: 77
file content (132 lines) | stat: -rw-r--r-- 5,699 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import logging
import applicationinsights
from weakref import WeakValueDictionary
from applicationinsights.channel import AsynchronousSender, AsynchronousQueue
from applicationinsights.channel import SynchronousSender, SynchronousQueue
from applicationinsights.channel import TelemetryChannel

enabled_instrumentation_keys = WeakValueDictionary()

def enable(instrumentation_key, *args, **kwargs):
    """Enables the Application Insights logging handler for the root logger for the supplied instrumentation key.
    Multiple calls to this function with different instrumentation keys result in multiple handler instances.

    .. code:: python

        import logging
        from applicationinsights.logging import enable

        # set up logging
        enable('<YOUR INSTRUMENTATION KEY GOES HERE>')

        # log something (this will be sent to the Application Insights service as a trace)
        logging.info('This is a message')

        # logging shutdown will cause a flush of all un-sent telemetry items
        # alternatively set up an async channel via enable('<YOUR INSTRUMENTATION KEY GOES HERE>', async_=True)

    Args:
        instrumentation_key (str). the instrumentation key to use while sending telemetry to the service.

    Keyword Args:
        async_ (bool): Whether to use an async channel for the telemetry. Defaults to False.
        endpoint (str): The custom endpoint to which to send the telemetry. Defaults to None.
        level (Union[int, str]): The level to set for the logger. Defaults to INFO.

    Returns:
        :class:`ApplicationInsightsHandler`. the newly created or existing handler.
    """
    if not instrumentation_key:
        raise Exception('Instrumentation key was required but not provided')
    if instrumentation_key in enabled_instrumentation_keys:
        logging.getLogger().removeHandler(enabled_instrumentation_keys[instrumentation_key])
    async_ = kwargs.pop('async_', False)
    endpoint = kwargs.pop('endpoint', None)
    telemetry_channel = kwargs.get('telemetry_channel')
    if telemetry_channel and async_:
        raise Exception('Incompatible arguments async_ and telemetry_channel')
    if telemetry_channel and endpoint:
        raise Exception('Incompatible arguments endpoint and telemetry_channel')
    if not telemetry_channel:
        if async_:
            sender, queue = AsynchronousSender, AsynchronousQueue
        else:
            sender, queue = SynchronousSender, SynchronousQueue
        kwargs['telemetry_channel'] = TelemetryChannel(queue=queue(sender(endpoint)))
    log_level = kwargs.pop('level', logging.INFO)
    handler = LoggingHandler(instrumentation_key, *args, **kwargs)
    handler.setLevel(log_level)
    enabled_instrumentation_keys[instrumentation_key] = handler
    logging.getLogger().addHandler(handler)
    return handler


class LoggingHandler(logging.Handler):
    """This class represents an integration point between Python's logging framework and the Application Insights
    service.

    Logging records are sent to the service either as simple Trace telemetry or as Exception telemetry (in the case
    of exception information being available).

    .. code:: python

        import logging
        from applicationinsights.logging import ApplicationInsightsHandler

        # set up logging
        handler = ApplicationInsightsHandler('<YOUR INSTRUMENTATION KEY GOES HERE>')
        logging.basicConfig(handlers=[ handler ], format='%(levelname)s: %(message)s', level=logging.DEBUG)

        # log something (this will be sent to the Application Insights service as a trace)
        logging.info('This is a message')

        # logging shutdown will cause a flush of all un-sent telemetry items
        # alternatively flush manually via handler.flush()
    """
    def __init__(self, instrumentation_key, *args, **kwargs):
        """
        Initialize a new instance of the class.

        Args:
            instrumentation_key (str). the instrumentation key to use while sending telemetry to the service.
        """
        if not instrumentation_key:
            raise Exception('Instrumentation key was required but not provided')
        telemetry_channel = kwargs.get('telemetry_channel')
        if 'telemetry_channel' in kwargs:
            del kwargs['telemetry_channel']
        self.client = applicationinsights.TelemetryClient(instrumentation_key, telemetry_channel)
        super(LoggingHandler, self).__init__(*args, **kwargs)

    def flush(self):
        """Flushes the queued up telemetry to the service.
        """
        self.client.flush()
        return super(LoggingHandler, self).flush()

    def emit(self, record):
        """Emit a record.

        If a formatter is specified, it is used to format the record. If exception information is present, an Exception
        telemetry object is sent instead of a Trace telemetry object.

        Args:
            record (:class:`logging.LogRecord`). the record to format and send.
        """
        # the set of properties that will ride with the record
        properties = {
            'process': record.processName,
            'module': record.module,
            'fileName': record.filename,
            'lineNumber': record.lineno,
            'level': record.levelname,
        }

        # if we have exec_info, we will use it as an exception
        if record.exc_info:
            self.client.track_exception(*record.exc_info, properties=properties)
            return

        # if we don't simply format the message and send the trace
        formatted_message = self.format(record)
        self.client.track_trace(formatted_message, properties=properties, severity=record.levelname)