1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
|
import gzip
from unittest import TestCase
from wsgiref.util import setup_testing_defaults
from prometheus_client import CollectorRegistry, Counter, make_wsgi_app
from prometheus_client.exposition import _bake_output, CONTENT_TYPE_LATEST
class WSGITest(TestCase):
def setUp(self):
self.registry = CollectorRegistry()
self.captured_status = None
self.captured_headers = None
# Setup WSGI environment
self.environ = {}
setup_testing_defaults(self.environ)
def capture(self, status, header):
self.captured_status = status
self.captured_headers = header
def increment_metrics(self, metric_name, help_text, increments):
c = Counter(metric_name, help_text, registry=self.registry)
for _ in range(increments):
c.inc()
def assert_outputs(self, outputs, metric_name, help_text, increments, compressed):
self.assertEqual(len(outputs), 1)
if compressed:
output = gzip.decompress(outputs[0]).decode(encoding="utf-8")
else:
output = outputs[0].decode('utf8')
# Status code
self.assertEqual(self.captured_status, "200 OK")
# Headers
num_of_headers = 2 if compressed else 1
self.assertEqual(len(self.captured_headers), num_of_headers)
self.assertIn(("Content-Type", CONTENT_TYPE_LATEST), self.captured_headers)
if compressed:
self.assertIn(("Content-Encoding", "gzip"), self.captured_headers)
# Body
self.assertIn("# HELP " + metric_name + "_total " + help_text + "\n", output)
self.assertIn("# TYPE " + metric_name + "_total counter\n", output)
self.assertIn(metric_name + "_total " + str(increments) + ".0\n", output)
def validate_metrics(self, metric_name, help_text, increments):
"""
WSGI app serves the metrics from the provided registry.
"""
self.increment_metrics(metric_name, help_text, increments)
# Create and run WSGI app
app = make_wsgi_app(self.registry)
outputs = app(self.environ, self.capture)
# Assert outputs
self.assert_outputs(outputs, metric_name, help_text, increments, compressed=False)
def test_report_metrics_1(self):
self.validate_metrics("counter", "A counter", 2)
def test_report_metrics_2(self):
self.validate_metrics("counter", "Another counter", 3)
def test_report_metrics_3(self):
self.validate_metrics("requests", "Number of requests", 5)
def test_report_metrics_4(self):
self.validate_metrics("failed_requests", "Number of failed requests", 7)
def test_favicon_path(self):
from unittest.mock import patch
# Create mock to enable counting access of _bake_output
with patch("prometheus_client.exposition._bake_output", side_effect=_bake_output) as mock:
# Create and run WSGI app
app = make_wsgi_app(self.registry)
# Try accessing the favicon path
favicon_environ = dict(self.environ)
favicon_environ['PATH_INFO'] = '/favicon.ico'
outputs = app(favicon_environ, self.capture)
# Test empty response
self.assertEqual(outputs, [b''])
self.assertEqual(mock.call_count, 0)
# Try accessing normal paths
app(self.environ, self.capture)
self.assertEqual(mock.call_count, 1)
def test_gzip(self):
# Increment a metric
metric_name = "counter"
help_text = "A counter"
increments = 2
self.increment_metrics(metric_name, help_text, increments)
app = make_wsgi_app(self.registry)
# Try accessing metrics using the gzip Accept-Content header.
gzip_environ = dict(self.environ)
gzip_environ['HTTP_ACCEPT_ENCODING'] = 'gzip'
outputs = app(gzip_environ, self.capture)
# Assert outputs are compressed.
self.assert_outputs(outputs, metric_name, help_text, increments, compressed=True)
def test_gzip_disabled(self):
# Increment a metric
metric_name = "counter"
help_text = "A counter"
increments = 2
self.increment_metrics(metric_name, help_text, increments)
# Disable compression explicitly.
app = make_wsgi_app(self.registry, disable_compression=True)
# Try accessing metrics using the gzip Accept-Content header.
gzip_environ = dict(self.environ)
gzip_environ['HTTP_ACCEPT_ENCODING'] = 'gzip'
outputs = app(gzip_environ, self.capture)
# Assert outputs are not compressed.
self.assert_outputs(outputs, metric_name, help_text, increments, compressed=False)
|