1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
import logging
import os
import threading
import time
import traceback
import gunicorn
import gunicorn.app.base
from flask import Flask
from prometheus_client import make_wsgi_app
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from pebble import concurrent
from prometheus_client.core import REGISTRY, InfoMetricFamily
from concurrent.futures import TimeoutError
from prometheus_flask_exporter.multiprocess import GunicornInternalPrometheusMetrics
from prometheus_client import Counter
class StandaloneApplication(gunicorn.app.base.BaseApplication):
def __init__(self, app, options=None):
self.options = options or {}
self.application = app
super().__init__()
def load_config(self):
config = {key: value for key, value in self.options.items()
if key in self.cfg.settings and value is not None}
for key, value in config.items():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
class CustomCollector:
def collect(self):
info = InfoMetricFamily('xxxx', 'xxxxxx')
info.add_metric(labels='version',
value={
'version': 'xxxxx',
'loglevel': 'xxx',
'root': 'xxxx',
'workers': 'xxxx',
'ip': 'xxxxx',
'port': 'xxx',
'config_name': 'xxxx',
'mode': 'xx',
'debug': 'xxx',
'node': 'xxx',
'pod': 'xxx',
'pid': str(os.getpid())
}
)
yield info
thread_sum = Counter('thread_count',
'Total count of the thread application.',
['pod', 'node', 'mode'])
def add_metric_thread(count=False):
if count:
thread_sum.labels(mode='mode', node='NODE', pod='POD').inc(count)
else:
thread_sum.labels(mode='mode', node='NODE', pod='POD')
def when_ready(server):
GunicornInternalPrometheusMetrics.start_http_server_when_ready(8080)
def child_exit(server, worker):
GunicornInternalPrometheusMetrics.mark_process_dead_on_child_exit(worker.pid)
def thread_function():
@concurrent.process(timeout=300)
def job():
open_threads = threading.active_count()
add_metric_thread(open_threads)
print(f'How thread Open .: {open_threads}')
print(f'run_threaded - {threading.current_thread()}')
time.sleep(20)
while True:
time.sleep(10)
future = job()
try:
future.result() # blocks until results are ready
except TimeoutError as error:
logging.error(f'Job timeout of 5 minute {error.args[1]}')
except Exception:
logging.error(f' job - {traceback.format_exc()}')
def init():
print('Thread - starting')
thread = threading.Thread(target=thread_function, daemon=True)
thread.start()
add_metric_thread()
def create_app():
app = Flask(__name__)
metrics.init_app(app)
# Add prometheus wsgi middleware to route /metrics requests
app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {
'/metrics': make_wsgi_app(registry=REGISTRY)
})
init()
@app.route('/test')
def main():
return 'Ok'
return app
REGISTRY.register(CustomCollector())
metrics = GunicornInternalPrometheusMetrics.for_app_factory(
path='/metrics',
static_labels={'node': 'xxx', 'pod': 'xx', 'version': 'xx'},
registry=REGISTRY
)
if __name__ == '__main__':
options = {
'bind': ['0.0.0.0:9200'],
'workers': 4,
'loglevel': 'debug'
}
std_app = StandaloneApplication(create_app(), options)
std_app.run()
|