File: __init__.py

package info (click to toggle)
python-aiohttp-openmetrics 0.0.11-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 92 kB
  • sloc: python: 119; makefile: 2
file content (163 lines) | stat: -rw-r--r-- 4,651 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
__version__ = (0, 0, 11)

__all__ = [
    'metrics_middleware',
    'metrics',
    'setup_metrics',
    'Counter',
    'Gauge',
    'Histogram',
    'REGISTRY',
    'run_prometheus_server',
    ]

import asyncio
import base64
import time
from typing import Optional, Dict
from urllib.parse import quote_plus

from aiohttp import web
from aiohttp.client import ClientSession, ClientTimeout
from yarl import URL

from prometheus_client.metrics import (
    Counter,
    Gauge,
    Histogram,
)
from prometheus_client.exposition import (
    generate_latest,
    CONTENT_TYPE_LATEST,
)
from prometheus_client.registry import (
    REGISTRY,
)

request_counter = Counter(
    "requests_total", "Total Request Count", ["method", "route", "status"]
)

request_connection_reset_counter = Counter(
    "requests_connection_reset_total",
    "Total Number of Requests where the connection was reset",
    ["method", "route"]
)

request_cancelled_counter = Counter(
    "requests_cancelled_total",
    "Total Number of Requests that were cancelled",
    ["method", "route"]
)

request_latency_hist = Histogram(
    "request_latency_seconds", "Request latency", ["route"]
)

requests_in_progress_gauge = Gauge(
    "requests_in_progress_total", "Requests currently in progress",
    ["method", "route"]
)

request_exceptions = Counter(
    "request_exceptions_total", "Total Number of Exceptions during Requests",
    ["method", "route"])


async def metrics(request: web.Request) -> web.Response:
    resp = web.Response(body=generate_latest(registry=REGISTRY))
    resp.content_type = CONTENT_TYPE_LATEST
    return resp


@web.middleware
async def metrics_middleware(request: web.Request, handler) -> web.Response:
    start_time = time.time()
    route = request.match_info.route.name
    requests_in_progress_gauge.labels(request.method, route).inc()
    try:
        response = await handler(request)
    except web.HTTPException as e:
        request_counter.labels(request.method, route, e.status_code).inc()
        raise
    except ConnectionResetError:
        request_connection_reset_counter.labels(request.method, route).inc()
        raise
    except asyncio.CancelledError:
        request_cancelled_counter.labels(request.method, route).inc()
        raise
    except Exception:
        request_exceptions.labels(request.method, route).inc()
        raise
    finally:
        resp_time = time.time() - start_time
        request_latency_hist.labels(route).observe(resp_time)
        requests_in_progress_gauge.labels(request.method, route).dec()
    request_counter.labels(request.method, route, response.status).inc()
    return response


def setup_metrics(app: web.Application):
    """Setup middleware and install metrics on app.
    """
    app.middlewares.insert(0, metrics_middleware)
    app.router.add_get("/metrics", metrics, name="metrics")


async def run_prometheus_server(listen_addr: str, port: int):
    """Convenience function to run a web server with metrics only.

    Args:
      listen_addr: Address to listen on
      port: Port to listen on
    """
    app = web.Application()
    setup_metrics(app)
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, listen_addr, port)
    await site.start()


# _escape_grouping_key imported from pprometheus-client @
# https://github.com/prometheus/client_python
def _escape_grouping_key(k, v):
    if v == "":
        # Per https://github.com/prometheus/pushgateway/pull/346.
        return k + "@base64", "="
    elif '/' in v:
        # Added in Pushgateway 0.9.0.
        return (
            k + "@base64",
            base64.urlsafe_b64encode(v.encode("utf-8")).decode("utf-8"))
    else:
        return k, quote_plus(v)


async def push_to_gateway(
        gateway: str, job: str, registry, timeout: int = 30,
        grouping_key: Optional[Dict[str, str]] = None):
    """Push results to a pushgateway.

    Args:
      gateway: URL to the push gateway
      job: Name of the exported job
      registry: Registry to get variables from
      timeout: Timeout in seconds
      grouping_key: Dict with key/values to add
    """
    (k, v) = _escape_grouping_key("job", job)
    url = URL(gateway) / "metrics" / k / v

    for (k, v) in sorted((grouping_key or {}).items()):
        (k, v) = _escape_grouping_key(k, v)
        url = url / k / v

    data = generate_latest(registry)

    async with ClientSession() as session:
        async with session.put(
                url, timeout=ClientTimeout(timeout),
                headers={'Content-Type': CONTENT_TYPE_LATEST}, data=data,
                raise_for_status=True):
            pass