File: server.py

package info (click to toggle)
prometheus-flask-exporter 0.23.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 896 kB
  • sloc: python: 2,889; sh: 709; makefile: 4
file content (139 lines) | stat: -rw-r--r-- 3,921 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import logging
import os
import threading
import time
import traceback
import gunicorn
import gunicorn.app.base

from flask import Flask
from prometheus_client import make_wsgi_app
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from pebble import concurrent
from prometheus_client.core import REGISTRY, InfoMetricFamily
from concurrent.futures import TimeoutError
from prometheus_flask_exporter.multiprocess import GunicornInternalPrometheusMetrics
from prometheus_client import Counter


class StandaloneApplication(gunicorn.app.base.BaseApplication):

    def __init__(self, app, options=None):
        self.options = options or {}
        self.application = app
        super().__init__()

    def load_config(self):
        config = {key: value for key, value in self.options.items()
                  if key in self.cfg.settings and value is not None}
        for key, value in config.items():
            self.cfg.set(key.lower(), value)

    def load(self):
        return self.application


class CustomCollector:
    def collect(self):
        info = InfoMetricFamily('xxxx', 'xxxxxx')
        info.add_metric(labels='version',
                        value={
                            'version': 'xxxxx',
                            'loglevel': 'xxx',
                            'root': 'xxxx',
                            'workers': 'xxxx',
                            'ip': 'xxxxx',
                            'port': 'xxx',
                            'config_name': 'xxxx',
                            'mode': 'xx',
                            'debug': 'xxx',
                            'node': 'xxx',
                            'pod': 'xxx',
                            'pid': str(os.getpid())
                        }
                        )
        yield info


thread_sum = Counter('thread_count',
                     'Total count of the thread application.',
                     ['pod', 'node', 'mode'])


def add_metric_thread(count=False):
    if count:
        thread_sum.labels(mode='mode', node='NODE', pod='POD').inc(count)
    else:
        thread_sum.labels(mode='mode', node='NODE', pod='POD')


def when_ready(server):
    GunicornInternalPrometheusMetrics.start_http_server_when_ready(8080)


def child_exit(server, worker):
    GunicornInternalPrometheusMetrics.mark_process_dead_on_child_exit(worker.pid)


def thread_function():
    @concurrent.process(timeout=300)
    def job():
        open_threads = threading.active_count()
        add_metric_thread(open_threads)
        print(f'How thread Open .: {open_threads}')
        print(f'run_threaded - {threading.current_thread()}')
        time.sleep(20)

    while True:
        time.sleep(10)

        future = job()
        try:
            future.result()  # blocks until results are ready
        except TimeoutError as error:
            logging.error(f'Job timeout of 5 minute {error.args[1]}')
        except Exception:
            logging.error(f' job - {traceback.format_exc()}')


def init():
    print('Thread - starting')
    thread = threading.Thread(target=thread_function, daemon=True)
    thread.start()
    add_metric_thread()


def create_app():
    app = Flask(__name__)
    metrics.init_app(app)

    # Add prometheus wsgi middleware to route /metrics requests
    app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {
        '/metrics': make_wsgi_app(registry=REGISTRY)
    })

    init()

    @app.route('/test')
    def main():
        return 'Ok'

    return app


REGISTRY.register(CustomCollector())

metrics = GunicornInternalPrometheusMetrics.for_app_factory(
    path='/metrics',
    static_labels={'node': 'xxx', 'pod': 'xx', 'version': 'xx'},
    registry=REGISTRY
)

if __name__ == '__main__':
    options = {
        'bind': ['0.0.0.0:9200'],
        'workers': 4,
        'loglevel': 'debug'
    }
    std_app = StandaloneApplication(create_app(), options)
    std_app.run()