File: test_wsgi.py

package info (click to toggle)
python-prometheus-client 0.21.1%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 768 kB
  • sloc: python: 6,309; makefile: 9
file content (114 lines) | stat: -rw-r--r-- 4,727 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import gzip
from unittest import TestCase
from wsgiref.util import setup_testing_defaults

from prometheus_client import CollectorRegistry, Counter, make_wsgi_app
from prometheus_client.exposition import _bake_output, CONTENT_TYPE_LATEST


class WSGITest(TestCase):
    def setUp(self):
        self.registry = CollectorRegistry()
        self.captured_status = None
        self.captured_headers = None
        # Setup WSGI environment
        self.environ = {}
        setup_testing_defaults(self.environ)

    def capture(self, status, header):
        self.captured_status = status
        self.captured_headers = header

    def increment_metrics(self, metric_name, help_text, increments):
        c = Counter(metric_name, help_text, registry=self.registry)
        for _ in range(increments):
            c.inc()

    def assert_outputs(self, outputs, metric_name, help_text, increments, compressed):
        self.assertEqual(len(outputs), 1)
        if compressed:
            output = gzip.decompress(outputs[0]).decode(encoding="utf-8")
        else:
            output = outputs[0].decode('utf8')
        # Status code
        self.assertEqual(self.captured_status, "200 OK")
        # Headers
        num_of_headers = 2 if compressed else 1
        self.assertEqual(len(self.captured_headers), num_of_headers)
        self.assertIn(("Content-Type", CONTENT_TYPE_LATEST), self.captured_headers)
        if compressed:
            self.assertIn(("Content-Encoding", "gzip"), self.captured_headers)
        # Body
        self.assertIn("# HELP " + metric_name + "_total " + help_text + "\n", output)
        self.assertIn("# TYPE " + metric_name + "_total counter\n", output)
        self.assertIn(metric_name + "_total " + str(increments) + ".0\n", output)

    def validate_metrics(self, metric_name, help_text, increments):
        """
        WSGI app serves the metrics from the provided registry.
        """
        self.increment_metrics(metric_name, help_text, increments)
        # Create and run WSGI app
        app = make_wsgi_app(self.registry)
        outputs = app(self.environ, self.capture)
        # Assert outputs
        self.assert_outputs(outputs, metric_name, help_text, increments, compressed=False)

    def test_report_metrics_1(self):
        self.validate_metrics("counter", "A counter", 2)

    def test_report_metrics_2(self):
        self.validate_metrics("counter", "Another counter", 3)

    def test_report_metrics_3(self):
        self.validate_metrics("requests", "Number of requests", 5)

    def test_report_metrics_4(self):
        self.validate_metrics("failed_requests", "Number of failed requests", 7)

    def test_favicon_path(self):
        from unittest.mock import patch

        # Create mock to enable counting access of _bake_output
        with patch("prometheus_client.exposition._bake_output", side_effect=_bake_output) as mock:
            # Create and run WSGI app
            app = make_wsgi_app(self.registry)
            # Try accessing the favicon path
            favicon_environ = dict(self.environ)
            favicon_environ['PATH_INFO'] = '/favicon.ico'
            outputs = app(favicon_environ, self.capture)
            # Test empty response
            self.assertEqual(outputs, [b''])
            self.assertEqual(mock.call_count, 0)
            # Try accessing normal paths
            app(self.environ, self.capture)
            self.assertEqual(mock.call_count, 1)

    def test_gzip(self):
        # Increment a metric
        metric_name = "counter"
        help_text = "A counter"
        increments = 2
        self.increment_metrics(metric_name, help_text, increments)
        app = make_wsgi_app(self.registry)
        # Try accessing metrics using the gzip Accept-Content header.
        gzip_environ = dict(self.environ)
        gzip_environ['HTTP_ACCEPT_ENCODING'] = 'gzip'
        outputs = app(gzip_environ, self.capture)
        # Assert outputs are compressed.
        self.assert_outputs(outputs, metric_name, help_text, increments, compressed=True)

    def test_gzip_disabled(self):
        # Increment a metric
        metric_name = "counter"
        help_text = "A counter"
        increments = 2
        self.increment_metrics(metric_name, help_text, increments)
        # Disable compression explicitly.
        app = make_wsgi_app(self.registry, disable_compression=True)
        # Try accessing metrics using the gzip Accept-Content header.
        gzip_environ = dict(self.environ)
        gzip_environ['HTTP_ACCEPT_ENCODING'] = 'gzip'
        outputs = app(gzip_environ, self.capture)
        # Assert outputs are not compressed.
        self.assert_outputs(outputs, metric_name, help_text, increments, compressed=False)