File: main.py

package info (click to toggle)
prometheus-fastapi-instrumentator 7.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 492 kB
  • sloc: python: 2,586; makefile: 5
file content (159 lines) | stat: -rw-r--r-- 4,875 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from typing import Callable, Optional, Sequence, Union

from fastapi import FastAPI
from prometheus_client import REGISTRY, CollectorRegistry, Counter, Histogram, Summary

from prometheus_fastapi_instrumentator import Instrumentator
from prometheus_fastapi_instrumentator.metrics import Info

PING_TOTAL = Counter("ping", "Number of pings calls.")


def my_metrics(
    latency_highr_buckets: Sequence[Union[float, str]] = (
        0.01,
        0.025,
        0.05,
        0.075,
        0.1,
        0.25,
        0.5,
        0.75,
        1,
        1.5,
        2,
        2.5,
        3,
        3.5,
        4,
        4.5,
        5,
        7.5,
        10,
        30,
        60,
    ),
    latency_lowr_buckets: Sequence[Union[float, str]] = (0.1, 0.5, 1),
    registry: CollectorRegistry = REGISTRY,
) -> Optional[Callable[[Info], None]]:
    def is_duplicated_time_series(error: ValueError) -> bool:
        return any(
            map(
                error.args[0].__contains__,
                [
                    "Duplicated timeseries in CollectorRegistry:",
                    "Duplicated time series in CollectorRegistry:",
                ],
            )
        )

    if latency_highr_buckets[-1] != float("inf"):
        latency_highr_buckets = [*latency_highr_buckets, float("inf")]

    if latency_lowr_buckets[-1] != float("inf"):
        latency_lowr_buckets = [*latency_lowr_buckets, float("inf")]

    # Starlette will call app.build_middleware_stack() with every new middleware
    # added, which will call all this again, which will make the registry
    # complain about duplicated metrics.
    #
    # The Python Prometheus client currently doesn't seem to have a way to
    # verify if adding a metric will cause errors or not, so the only way to
    # handle it seems to be with this try block.
    try:
        TOTAL = Counter(
            name="http_requests_total",
            documentation="Total number of requests by method, status and handler.",
            labelnames=(
                "my_method",
                "my_status",
                "my_handler",
            ),
            registry=registry,
        )

        IN_SIZE = Summary(
            name="http_request_size_bytes",
            documentation=(
                "Content length of incoming requests by handler. "
                "Only value of header is respected. Otherwise ignored. "
                "No percentile calculated. "
            ),
            labelnames=("my_handler",),
            registry=registry,
        )

        OUT_SIZE = Summary(
            name="http_response_size_bytes",
            documentation=(
                "Content length of outgoing responses by handler. "
                "Only value of header is respected. Otherwise ignored. "
                "No percentile calculated. "
            ),
            labelnames=("my_handler",),
            registry=registry,
        )

        LATENCY_HIGHR = Histogram(
            name="http_request_duration_highr_seconds",
            documentation=(
                "Latency with many buckets but no API specific labels. "
                "Made for more accurate percentile calculations. "
            ),
            buckets=latency_highr_buckets,
            registry=registry,
        )

        LATENCY_LOWR = Histogram(
            name="http_request_duration_seconds",
            documentation=(
                "Latency with only few buckets by handler. "
                "Made to be only used if aggregation by handler is important. "
            ),
            buckets=latency_lowr_buckets,
            labelnames=(
                "my_method",
                "my_handler",
            ),
            registry=registry,
        )

        def instrumentation(info: Info) -> None:
            TOTAL.labels(info.method, info.modified_status, info.modified_handler).inc()

            IN_SIZE.labels(info.modified_handler).observe(
                int(info.request.headers.get("Content-Length", 0))
            )

            if info.response and hasattr(info.response, "headers"):
                OUT_SIZE.labels(info.modified_handler).observe(
                    int(info.response.headers.get("Content-Length", 0))
                )
            else:
                OUT_SIZE.labels(info.modified_handler).observe(0)

            if info.modified_status.startswith("2"):
                LATENCY_HIGHR.observe(info.modified_duration)

            LATENCY_LOWR.labels(info.modified_handler, info.method).observe(
                info.modified_duration
            )

        return instrumentation

    except ValueError as e:
        if not is_duplicated_time_series(e):
            raise e

    return None


app = FastAPI()

Instrumentator().instrument(app).add(my_metrics()).expose(app)


@app.get("/ping")
def get_ping():
    PING_TOTAL.inc()
    return "pong"