File: TelemetryChannel.py

package info (click to toggle)
python-applicationinsights 0.11.10-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 876 kB
  • sloc: python: 5,948; makefile: 151; sh: 77
file content (123 lines) | stat: -rw-r--r-- 4,846 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import datetime
import sys

from .SynchronousQueue import SynchronousQueue
from .SynchronousSender import SynchronousSender
from .TelemetryContext import TelemetryContext
from applicationinsights.channel import contracts

platform_moniker = 'py2'
if sys.version_info >= (3, 0):
    platform_moniker = 'py3'

# set up internal context
internal_context = contracts.Internal()
internal_context.sdk_version = platform_moniker + ':0.11.10'


class TelemetryChannel(object):
    """The telemetry channel is responsible for constructing a :class:`contracts.Envelope` object from the passed in
    data and specified telemetry context.

    .. code:: python

        from application_insights.channel import TelemetryChannel, contracts
        channel = TelemetryChannel()
        event = contracts.EventData()
        event.name = 'My event'
        channel.write(event)
    """

    def __init__(self, context=None, queue=None):
        """Initializes a new instance of the class.

        Args:
            context (:class:`TelemetryContext') the telemetry context to use when sending telemetry data.\n
            queue (:class:`QueueBase`) the queue to enqueue the resulting :class:`contracts.Envelope` to.
        """
        self._context = context or TelemetryContext()
        self._queue = queue or SynchronousQueue(SynchronousSender())

    @property
    def context(self):
        """The context associated with this channel. All :class:`contracts.Envelope` objects created by this channel
        will use this value if it's present or if none is specified as part of the :func:`write` call.

        Returns:
            (:class:`TelemetryContext`). the context instance (defaults to: TelemetryContext())
        """
        return self._context

    @property
    def queue(self):
        """The queue associated with this channel. All :class:`contracts.Envelope` objects created by this channel
        will be pushed to this queue.

        Returns:
            (:class:`QueueBase`). the queue instance (defaults to: SynchronousQueue())
        """
        return self._queue

    @property
    def sender(self):
        """The sender associated with this channel. This instance will be used to transmit telemetry to the service.

        Returns:
            (:class:`SenderBase`). the sender instance (defaults to: SynchronousSender())
        """
        return self._queue.sender

    def flush(self):
        """Flushes the enqueued data by calling :func:`flush` on :func:`queue`.
        """
        self._queue.flush()

    def write(self, data, context=None):
        """Enqueues the passed in data to the :func:`queue`. If the caller specifies a context as well, it will
        take precedence over the instance in :func:`context`.

        Args:
            data (object). data the telemetry data to send. This will be wrapped in an :class:`contracts.Envelope`
                before being enqueued to the :func:`queue`.
            context (:class:`TelemetryContext`). context the override context to use when constructing the
                :class:`contracts.Envelope`.
        """
        local_context = context or self._context
        if not local_context:
            raise Exception('Context was required but not provided')

        if not data:
            raise Exception('Data was required but not provided')

        envelope = contracts.Envelope()
        envelope.name = data.ENVELOPE_TYPE_NAME
        envelope.time = datetime.datetime.utcnow().isoformat() + 'Z'
        envelope.ikey = local_context.instrumentation_key
        tags = envelope.tags
        for prop_context in [self._context, context]:
            if not prop_context:
                continue
            for key, value in self._write_tags(prop_context):
                tags[key] = value
        envelope.data = contracts.Data()
        envelope.data.base_type = data.DATA_TYPE_NAME
        for prop_context in [context, self._context]:
            if not prop_context:
                continue
            if hasattr(data, 'properties') and prop_context.properties:
                properties = data.properties
                for key in prop_context.properties:
                    if key not in properties:
                        properties[key] = prop_context.properties[key]
        envelope.data.base_data = data

        self._queue.put(envelope)

    def _write_tags(self, context):
        for item in [internal_context,
                     context.device, context.cloud, context.application, context.user,
                     context.session, context.location, context.operation]:
            if not item:
                continue
            for pair in item.write().items():
                yield pair