1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import os
import shutil
import unittest
from unittest import mock
from datetime import datetime
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.metrics.export import NumberDataPoint
from azure.core.exceptions import HttpResponseError
from azure.monitor.opentelemetry.exporter.export._base import ExportResult
from azure.monitor.opentelemetry.exporter.statsbeat._exporter import _StatsBeatExporter
from azure.monitor.opentelemetry.exporter.statsbeat._state import _STATSBEAT_STATE
from azure.monitor.opentelemetry.exporter._constants import _STATSBEAT_METRIC_NAME_MAPPINGS
from azure.monitor.opentelemetry.exporter._generated import AzureMonitorClient
from azure.monitor.opentelemetry.exporter._generated.models import (
TelemetryErrorDetails,
TelemetryItem,
TrackResponse,
)
def throw(exc_type, *args, **kwargs):
def func(*_args, **_kwargs):
raise exc_type(*args, **kwargs)
return func
def clean_folder(folder):
if os.path.isfile(folder):
for filename in os.listdir(folder):
file_path = os.path.join(folder, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print("Failed to delete %s. Reason: %s" % (file_path, e))
# pylint: disable=protected-access
class TestStatsbeatExporter(unittest.TestCase):
@classmethod
def setUpClass(cls):
os.environ.clear()
os.environ["APPINSIGHTS_INSTRUMENTATIONKEY"] = "1234abcd-5678-4efa-8abc-1234567890ab"
os.environ["APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"] = "false"
cls._exporter = _StatsBeatExporter(
disable_offline_storage=True,
)
cls._envelopes_to_export = [TelemetryItem(name="Test", time=datetime.now())]
@mock.patch("azure.monitor.opentelemetry.exporter.statsbeat._statsbeat.collect_statsbeat_metrics")
def test_init(self, collect_mock):
exporter = _StatsBeatExporter(disable_offline_storage=True)
self.assertFalse(exporter._should_collect_stats())
collect_mock.assert_not_called()
def test_point_to_envelope(self):
resource = Resource.create(attributes={"asd": "test_resource"})
point = NumberDataPoint(
start_time_unix_nano=1646865018558419456,
time_unix_nano=1646865018558419457,
value=10,
attributes={},
)
for ot_name, sb_name in _STATSBEAT_METRIC_NAME_MAPPINGS.items():
envelope = self._exporter._point_to_envelope(point, ot_name, resource)
self.assertEqual(envelope.data.base_data.metrics[0].name, sb_name)
def test_transmit_200_reach_ingestion(self):
_STATSBEAT_STATE["INITIAL_SUCCESS"] = False
with mock.patch.object(AzureMonitorClient, "track") as post:
post.return_value = TrackResponse(
items_received=1,
items_accepted=1,
errors=[],
)
result = self._exporter._transmit(self._envelopes_to_export)
self.assertTrue(_STATSBEAT_STATE["INITIAL_SUCCESS"])
self.assertEqual(result, ExportResult.SUCCESS)
def test_transmit_206_reach_ingestion(self):
_STATSBEAT_STATE["INITIAL_SUCCESS"] = False
with mock.patch.object(AzureMonitorClient, "track") as post:
post.return_value = TrackResponse(
items_received=3,
items_accepted=1,
errors=[TelemetryErrorDetails(index=0, status_code=500, message="should retry")],
)
result = self._exporter._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE)
self.assertTrue(_STATSBEAT_STATE["INITIAL_SUCCESS"])
def test_transmit_reach_ingestion_code(self):
_STATSBEAT_STATE["INITIAL_SUCCESS"] = False
with mock.patch("azure.monitor.opentelemetry.exporter.export._base._reached_ingestion_code") as m, mock.patch(
"azure.monitor.opentelemetry.exporter.export._base._is_retryable_code"
) as p:
m.return_value = True
p.return_value = True
with mock.patch.object(AzureMonitorClient, "track", throw(HttpResponseError)):
result = self._exporter._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_RETRYABLE)
self.assertTrue(_STATSBEAT_STATE["INITIAL_SUCCESS"])
def test_transmit_not_reach_ingestion_code(self):
_STATSBEAT_STATE["INITIAL_SUCCESS"] = False
_STATSBEAT_STATE["INITIAL_FAILURE_COUNT"] = 1
with mock.patch("azure.monitor.opentelemetry.exporter.export._base._reached_ingestion_code") as m, mock.patch(
"azure.monitor.opentelemetry.exporter.export._base._is_retryable_code"
) as p:
m.return_value = False
p.return_value = False
with mock.patch.object(AzureMonitorClient, "track", throw(HttpResponseError)):
result = self._exporter._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE)
self.assertFalse(_STATSBEAT_STATE["INITIAL_SUCCESS"])
self.assertEqual(_STATSBEAT_STATE["INITIAL_FAILURE_COUNT"], 2)
def test_transmit_not_reach_ingestion_exception(self):
_STATSBEAT_STATE["INITIAL_SUCCESS"] = False
_STATSBEAT_STATE["INITIAL_FAILURE_COUNT"] = 1
with mock.patch("azure.monitor.opentelemetry.exporter.statsbeat._statsbeat.shutdown_statsbeat_metrics") as m:
with mock.patch.object(AzureMonitorClient, "track", throw(Exception)):
result = self._exporter._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE)
self.assertFalse(_STATSBEAT_STATE["INITIAL_SUCCESS"])
self.assertEqual(_STATSBEAT_STATE["INITIAL_FAILURE_COUNT"], 2)
m.assert_not_called()
def test_transmit_not_reach_ingestion_exception_shutdown(self):
_STATSBEAT_STATE["INITIAL_SUCCESS"] = False
_STATSBEAT_STATE["INITIAL_FAILURE_COUNT"] = 2
with mock.patch("azure.monitor.opentelemetry.exporter.statsbeat._statsbeat.shutdown_statsbeat_metrics") as m:
with mock.patch.object(AzureMonitorClient, "track", throw(Exception)):
result = self._exporter._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE)
self.assertFalse(_STATSBEAT_STATE["INITIAL_SUCCESS"])
self.assertEqual(_STATSBEAT_STATE["INITIAL_FAILURE_COUNT"], 3)
m.assert_called_once()
|