1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
import unittest
import sys, os, os.path
rootDirectory = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..', '..')
if rootDirectory not in sys.path:
sys.path.append(rootDirectory)
from applicationinsights import channel
class TestTelemetryChannel(unittest.TestCase):
def test_construct(self):
actual = channel.TelemetryChannel()
self.assertIsNotNone(actual)
def test_context_works_as_expected(self):
context_global = channel.TelemetryContext()
context_global.device.id = "global"
actual = channel.TelemetryChannel()
self.assertIsNotNone(actual.context)
actual = channel.TelemetryChannel(None)
self.assertIsNotNone(actual.context)
actual = channel.TelemetryChannel(context_global)
self.assertEqual(context_global, actual.context)
def test_queue_works_as_expected(self):
queue = object()
actual = channel.TelemetryChannel()
self.assertIsNotNone(actual.queue)
actual = channel.TelemetryChannel(None, None)
self.assertIsNotNone(actual.queue)
actual = channel.TelemetryChannel(None, queue)
self.assertEqual(queue, actual.queue)
def test_sender_works_as_expected(self):
actual = channel.TelemetryChannel()
self.assertIsNotNone(actual.sender)
def test_flush_works_as_expected(self):
queue = MockQueue()
actual = channel.TelemetryChannel(None, queue)
self.assertEqual(0, queue.flush_count)
actual.flush()
self.assertEqual(1, queue.flush_count)
def test_construct_with_context_and_sender(self):
mock_sender = MockTelemetrySender()
queue = channel.SynchronousQueue(mock_sender)
context_global = channel.TelemetryContext()
context_global.device.id = "global"
actual = channel.TelemetryChannel(context_global, queue)
actual.write(channel.contracts.MessageData())
actual.flush()
self.assertIsNotNone(mock_sender.data)
self.assertEqual("global", mock_sender.data.tags["ai.device.id"])
def test_write_with_no_data_raises_exception(self):
mock_sender = MockTelemetrySender()
queue = channel.SynchronousQueue(mock_sender)
actual = channel.TelemetryChannel(None, queue)
self.assertRaises(Exception, actual.write, None)
def test_write_transfers_local_convext_over_global_context(self):
mock_sender = MockTelemetrySender()
queue = channel.SynchronousQueue(mock_sender)
context_global = channel.TelemetryContext()
context_global.device.id = "global"
context_local = channel.TelemetryContext()
context_local.device.id = "local"
actual = channel.TelemetryChannel(context_global, queue)
actual.write(channel.contracts.MessageData(), context_local)
actual.flush()
self.assertIsNotNone(mock_sender.data)
self.assertEqual("local", mock_sender.data.tags["ai.device.id"])
def test_write_constructs_valid_envelope(self):
mock_sender = MockTelemetrySender()
queue = channel.SynchronousQueue(mock_sender)
context_global = channel.TelemetryContext()
context_global.instrumentation_key = "42"
actual = channel.TelemetryChannel(context_global, queue)
cases = [
channel.contracts.EventData(),
channel.contracts.MetricData(),
channel.contracts.MessageData(),
channel.contracts.PageViewData(),
channel.contracts.ExceptionData(),
]
for item in cases:
actual.write(item)
actual.flush()
self.assertIsNotNone(mock_sender.data)
self.assertTrue(isinstance(mock_sender.data, channel.contracts.Envelope))
self.assertEqual(item.ENVELOPE_TYPE_NAME, mock_sender.data.name)
self.assertIsNotNone(mock_sender.data.time)
self.assertEqual("42", mock_sender.data.ikey)
for key, value in context_global.device.write().items():
self.assertEqual(value, mock_sender.data.tags[key])
for key, value in context_global.application.write().items():
self.assertEqual(value, mock_sender.data.tags[key])
for key, value in context_global.user.write().items():
self.assertEqual(value, mock_sender.data.tags[key])
for key, value in context_global.session.write().items():
self.assertEqual(value, mock_sender.data.tags[key])
for key, value in context_global.location.write().items():
self.assertEqual(value, mock_sender.data.tags[key])
for key, value in context_global.operation.write().items():
self.assertEqual(value, mock_sender.data.tags[key])
self.assertIsNotNone(mock_sender.data.data)
self.assertEqual(item.DATA_TYPE_NAME, mock_sender.data.data.base_type)
self.assertEqual(item, mock_sender.data.data.base_data)
class MockQueue(object):
def __init__(self):
self.flush_count = 0
def flush(self):
self.flush_count += 1
class MockTelemetrySender(channel.TelemetryChannel().sender.__class__):
def __init__(self):
self.data = None
self.send_buffer_size = 1
def send(self, envelope):
self.data = envelope[0];
|