1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
|
import logging
import unittest
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import (
InMemoryLogRecordExporter,
SimpleLogRecordProcessor,
)
def set_up_logging_handler(level):
logger_provider = LoggerProvider()
exporter = InMemoryLogRecordExporter()
processor = SimpleLogRecordProcessor(exporter=exporter)
logger_provider.add_log_record_processor(processor)
handler = LoggingHandler(level=level, logger_provider=logger_provider)
return handler, logger_provider
def create_logger(handler, name):
logger = logging.getLogger(name)
logger.addHandler(handler)
return logger
class TestLoggerProviderCache(unittest.TestCase):
def test_get_logger_single_handler(self):
handler, logger_provider = set_up_logging_handler(level=logging.DEBUG)
# pylint: disable=protected-access
logger_cache = logger_provider._logger_cache
logger = create_logger(handler, "test_logger")
# Ensure logger is lazily cached
self.assertEqual(0, len(logger_cache))
with self.assertLogs(level=logging.WARNING):
logger.warning("test message")
self.assertEqual(1, len(logger_cache))
# Ensure only one logger is cached
with self.assertLogs(level=logging.WARNING):
rounds = 100
for _ in range(rounds):
logger.warning("test message")
self.assertEqual(1, len(logger_cache))
def test_get_logger_multiple_loggers(self):
handler, logger_provider = set_up_logging_handler(level=logging.DEBUG)
# pylint: disable=protected-access
logger_cache = logger_provider._logger_cache
num_loggers = 10
loggers = [create_logger(handler, str(i)) for i in range(num_loggers)]
# Ensure loggers are lazily cached
self.assertEqual(0, len(logger_cache))
with self.assertLogs(level=logging.WARNING):
for logger in loggers:
logger.warning("test message")
self.assertEqual(num_loggers, len(logger_cache))
with self.assertLogs(level=logging.WARNING):
rounds = 100
for _ in range(rounds):
for logger in loggers:
logger.warning("test message")
self.assertEqual(num_loggers, len(logger_cache))
def test_provider_get_logger_no_cache(self):
_, logger_provider = set_up_logging_handler(level=logging.DEBUG)
# pylint: disable=protected-access
logger_cache = logger_provider._logger_cache
logger_provider.get_logger(
name="test_logger",
version="version",
schema_url="schema_url",
attributes={"key": "value"},
)
# Ensure logger is not cached if attributes is set
self.assertEqual(0, len(logger_cache))
def test_provider_get_logger_cached(self):
_, logger_provider = set_up_logging_handler(level=logging.DEBUG)
# pylint: disable=protected-access
logger_cache = logger_provider._logger_cache
logger_provider.get_logger(
name="test_logger",
version="version",
schema_url="schema_url",
)
# Ensure only one logger is cached
self.assertEqual(1, len(logger_cache))
logger_provider.get_logger(
name="test_logger",
version="version",
schema_url="schema_url",
)
# Ensure only one logger is cached
self.assertEqual(1, len(logger_cache))
|