File: test_processor.py

package info (click to toggle)
python-azure 20251118%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 783,356 kB
  • sloc: python: 6,474,533; ansic: 804; javascript: 287; sh: 205; makefile: 198; xml: 109
file content (311 lines) | stat: -rw-r--r-- 13,098 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
import os
import unittest
from unittest import mock

from opentelemetry.sdk import _logs
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry._logs.severity import SeverityNumber
from opentelemetry.trace import TraceFlags, set_span_in_context, SpanContext, NonRecordingSpan

from azure.monitor.opentelemetry.exporter.export.logs._exporter import (
    AzureMonitorLogExporter,
)
from azure.monitor.opentelemetry.exporter.export.logs._processor import (
    _AzureBatchLogRecordProcessor,
)


# pylint: disable=protected-access
class TestAzureBatchLogRecordProcessor(unittest.TestCase):
    """Test cases for the Azure Monitor Batch Log Record Processor with trace-based sampling."""

    @classmethod
    def setUpClass(cls):
        os.environ.pop("APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL", None)
        os.environ.pop("APPINSIGHTS_INSTRUMENTATIONKEY", None)
        os.environ["APPINSIGHTS_INSTRUMENTATIONKEY"] = "1234abcd-5678-4efa-8abc-1234567890ab"
        os.environ["APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"] = "true"
        cls._exporter = AzureMonitorLogExporter()

    def test_processor_initialization_without_trace_based_sampling(self):
        """Test processor initialization without trace-based sampling enabled."""
        processor = _AzureBatchLogRecordProcessor(
            self._exporter,
            options={}
        )
        self.assertFalse(processor._enable_trace_based_sampling_for_logs)

    def test_processor_initialization_with_trace_based_sampling(self):
        """Test processor initialization with trace-based sampling enabled."""
        processor = _AzureBatchLogRecordProcessor(
            self._exporter,
            options={"enable_trace_based_sampling_for_logs": True}
        )
        self.assertTrue(processor._enable_trace_based_sampling_for_logs)

    def test_processor_initialization_without_options(self):
        """Test processor initialization without options."""
        processor = _AzureBatchLogRecordProcessor(self._exporter)
        self.assertIsNone(processor._enable_trace_based_sampling_for_logs)

    def test_on_emit_with_trace_based_sampling_disabled(self):
        """Test on_emit does not filter logs when trace-based sampling is disabled."""
        processor = _AzureBatchLogRecordProcessor(
            self._exporter,
            options={}
        )
        
        mock_span_context = mock.Mock()
        mock_span_context.is_valid = True
        mock_span_context.trace_flags.sampled = False

        mock_span = mock.Mock()
        mock_span.get_span_context.return_value = mock_span_context

        span_context = SpanContext(
            trace_id=125960616039069540489478540494783893221,
            span_id=2909973987304607650,
            trace_flags=TraceFlags.DEFAULT,
            is_remote=False,
        )
        span = NonRecordingSpan(span_context)
        ctx = set_span_in_context(span)

        log_record = _logs.LogData(
            _logs.LogRecord(
                timestamp=1646865018558419456,
                context=ctx,
                severity_text="INFO",
                severity_number=SeverityNumber.INFO,
                body="Test log",
            ),
            InstrumentationScope("test_name"),
        )

        # Mock the parent class's on_emit method through super
        with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
            processor.on_emit(log_record)
            # Parent on_emit should be called because trace-based sampling is disabled
            parent_on_emit_mock.assert_called_once()

    def test_on_emit_with_trace_based_sampling_enabled_and_unsampled_trace(self): # cspell:disable-line
        """Test on_emit filters logs when trace-based sampling is enabled and trace is unsampled.""" # cspell:disable-line
        processor = _AzureBatchLogRecordProcessor(
            self._exporter,
            options={"enable_trace_based_sampling_for_logs": True}
        )

        mock_span_context = mock.Mock()
        mock_span_context.is_valid = True
        mock_span_context.trace_flags.sampled = False

        mock_span = mock.Mock()
        mock_span.get_span_context.return_value = mock_span_context

        span_context = SpanContext(
            trace_id=125960616039069540489478540494783893221,
            span_id=2909973987304607650,
            trace_flags=TraceFlags.DEFAULT,
            is_remote=False,
        )
        span = NonRecordingSpan(span_context)
        ctx = set_span_in_context(span)

        log_record = _logs.LogData(
            _logs.LogRecord(
                timestamp=1646865018558419456,
                context=ctx,
                severity_text="INFO",
                severity_number=SeverityNumber.INFO,
                body="Test log",
            ),
            InstrumentationScope("test_name"),
        )
        # Mock get_current_span to return our mock span with proper get_span_context method
        with mock.patch("azure.monitor.opentelemetry.exporter.export.logs._processor.get_current_span", return_value=mock_span):
            # Mock only the parent class's on_emit method
            with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
                processor.on_emit(log_record)
                # Parent on_emit should NOT be called because trace is unsampled and filtering is enabled # cspell:disable-line
                parent_on_emit_mock.assert_not_called()

    def test_on_emit_with_trace_based_sampling_enabled_and_sampled_trace(self):
        """Test on_emit does not filter logs when trace-based sampling is enabled and trace is sampled."""
        processor = _AzureBatchLogRecordProcessor(
            self._exporter,
            options={"enable_trace_based_sampling_for_logs": True}
        )

        mock_span_context = mock.Mock()
        mock_span_context.is_valid = True
        mock_span_context.trace_flags.sampled = True

        mock_span = mock.Mock()
        mock_span.get_span_context.return_value = mock_span_context
        
        span_context = SpanContext(
            trace_id=125960616039069540489478540494783893221,
            span_id=2909973987304607650,
            trace_flags=TraceFlags.SAMPLED,
            is_remote=False,
        )
        span = NonRecordingSpan(span_context)
        ctx = set_span_in_context(span)

        log_record = _logs.LogData(
            _logs.LogRecord(
                timestamp=1646865018558419456,
                context=ctx,
                severity_text="INFO",
                severity_number=SeverityNumber.INFO,
                body="Test log",
            ),
            InstrumentationScope("test_name"),
        )

        with mock.patch("azure.monitor.opentelemetry.exporter.export.logs._processor.get_current_span", return_value=mock_span):
            with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
                processor.on_emit(log_record)
                # Parent on_emit should be called because trace is sampled
                parent_on_emit_mock.assert_called_once()

    def test_on_emit_with_trace_based_sampling_enabled_and_invalid_span_context(self):
        """Test on_emit does not filter logs with invalid span context."""
        processor = _AzureBatchLogRecordProcessor(
            self._exporter,
            options={"enable_trace_based_sampling_for_logs": True}
        )

        mock_span_context = mock.Mock()
        mock_span_context.is_valid = False

        mock_span = mock.Mock()
        mock_span.get_span_context.return_value = mock_span_context

        span_context = SpanContext(
            trace_id=125960616039069540489478540494783893221,
            span_id=2909973987304607650,
            trace_flags=TraceFlags.DEFAULT,
            is_remote=False,
        )
        span = NonRecordingSpan(span_context)
        ctx = set_span_in_context(span)

        log_record = _logs.LogData(
            _logs.LogRecord(
                timestamp=1646865018558419456,
                context=ctx,
                severity_text="INFO",
                severity_number=SeverityNumber.INFO,
                body="Test log",
            ),
            InstrumentationScope("test_name"),
        )

        with mock.patch("azure.monitor.opentelemetry.exporter.export.logs._processor.get_current_span", return_value=mock_span):
            with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
                processor.on_emit(log_record)
                # Parent on_emit should be called because span context is invalid
                parent_on_emit_mock.assert_called_once()

    def test_on_emit_with_trace_based_sampling_enabled_and_no_context(self):
        """Test on_emit does not filter logs when there is no log record context."""
        processor = _AzureBatchLogRecordProcessor(
            self._exporter,
            options={"enable_trace_based_sampling_for_logs": True}
        )

        log_record = _logs.LogData(
            _logs.LogRecord(
                timestamp=1646865018558419456,
                context=None,
                severity_text="INFO",
                severity_number=SeverityNumber.INFO,
                body="Test log",
            ),
            InstrumentationScope("test_name"),
        )

        with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
            processor.on_emit(log_record)
            # Parent on_emit should be called because there's no context
            parent_on_emit_mock.assert_called_once()

    def test_on_emit_integration_with_multiple_log_records(self):
        """Integration test: verify processor handles multiple log records correctly with trace-based sampling."""
        processor = _AzureBatchLogRecordProcessor(
            self._exporter,
            options={"enable_trace_based_sampling_for_logs": True}
        )

        # Create unsampled span context # cspell:disable-line
        mock_span_context_unsampled = mock.Mock() # cspell:disable-line
        mock_span_context_unsampled.is_valid = True # cspell:disable-line
        mock_span_context_unsampled.trace_flags.sampled = False # cspell:disable-line

        mock_span_unsampled = mock.Mock() # cspell:disable-line
        mock_span_unsampled.get_span_context.return_value = mock_span_context_unsampled # cspell:disable-line

        # Create sampled span context
        mock_span_context_sampled = mock.Mock()
        mock_span_context_sampled.is_valid = True
        mock_span_context_sampled.trace_flags.sampled = True

        mock_span_sampled = mock.Mock()
        mock_span_sampled.get_span_context.return_value = mock_span_context_sampled
        
        span_context = SpanContext(
            trace_id=125960616039069540489478540494783893221,
            span_id=2909973987304607650,
            trace_flags=TraceFlags.DEFAULT,
            is_remote=False,
        )
        span = NonRecordingSpan(span_context)
        ctx = set_span_in_context(span)

        log_record_unsampled = _logs.LogData( # cspell:disable-line
            _logs.LogRecord(
                timestamp=1646865018558419456,
                context=ctx,
                severity_text="INFO",
                severity_number=SeverityNumber.INFO,
                body="Unsampled log", # cspell:disable-line
            ),
            InstrumentationScope("test_name"),
        )
        
        span_context = SpanContext(
            trace_id=125960616039069540489478540494783893221,
            span_id=2909973987304607650,
            trace_flags=TraceFlags.SAMPLED,
            is_remote=False,
        )
        span = NonRecordingSpan(span_context)
        ctx = set_span_in_context(span)

        log_record_sampled = _logs.LogData(
            _logs.LogRecord(
                timestamp=1646865018558419457,
                context=ctx,
                severity_text="INFO",
                severity_number=SeverityNumber.INFO,
                body="Sampled log",
            ),
            InstrumentationScope("test_name"),
        )

        with mock.patch("azure.monitor.opentelemetry.exporter.export.logs._processor.get_current_span") as get_span_mock:
            with mock.patch('opentelemetry.sdk._logs.export.BatchLogRecordProcessor.on_emit') as parent_on_emit_mock:
                # Test unsampled log is filtered # cspell:disable-line
                get_span_mock.return_value = mock_span_unsampled # cspell:disable-line
                processor.on_emit(log_record_unsampled) # cspell:disable-line
                parent_on_emit_mock.assert_not_called()

                # Reset mock
                parent_on_emit_mock.reset_mock()
                get_span_mock.reset_mock()

                # Test sampled log is not filtered
                get_span_mock.return_value = mock_span_sampled
                processor.on_emit(log_record_sampled)
                parent_on_emit_mock.assert_called_once()