1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
import os
from fastapi import FastAPI
from prometheus_client import (
CONTENT_TYPE_LATEST,
CollectorRegistry,
Counter,
generate_latest,
multiprocess,
)
from starlette.responses import Response
if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
raise ValueError("PROMETHEUS_MULTIPROC_DIR must be set to existing empty dir.")
PING_TOTAL = Counter("ping", "Number of pings calls.")
METRICS_TOTAL = Counter("metrics", "Number of metrics calls.")
MAIN_TOTAL = Counter("main", "Counts of main executions.")
MAIN_TOTAL.inc()
app = FastAPI()
@app.get("/ping")
def get_ping():
PING_TOTAL.inc()
return "pong"
@app.get("/metrics")
def get_metrics():
METRICS_TOTAL.inc()
# Note the ephemeral registry being used here. This follows the Prometheus
# client library documentation. It comes with multiple caveats. Using a
# persistent registry might work on first glance but it will lead to issues.
# For a long time PFI used a persistent registry, which was wrong.
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
resp = Response(content=generate_latest(registry))
resp.headers["Content-Type"] = CONTENT_TYPE_LATEST
return resp
|