1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
import requests
import time
import os
import subprocess
from urllib.parse import urlparse
from urllib.parse import urljoin
from prometheus_client.parser import text_string_to_metric_families
from prometheus_client.samples import Sample
from test_helper import ApiTestCase
PROMETHEUS_PORT="9090"
PROMETHEUS_URL="http://localhost:%s" % PROMETHEUS_PORT
PROMETHEUS_CONF="./prometheus-wforce.yml"
class TestPrometheus(ApiTestCase):
def parsePrometheusResponse(self, response):
# parse the text feed and iterate over samples so we make sure
# the content is well formated
for family in text_string_to_metric_families(response):
for sample in family.samples:
self.assertTrue(isinstance(sample, Sample))
result = {}
for line in response.split("\n"):
if not line or line.startswith('#'):
continue
values = line.split(" ")
self.assertGreaterEqual(len(values), 2)
result[" ".join(values[0:-1])] = values[-1]
return result
def getWforcePrometheusValues(self):
r = self.getWforceMetrics()
self.assertTrue(r)
self.assertEqual(r.status_code, 200)
return self.parsePrometheusResponse(r.text)
def getTrackalertPrometheusValues(self):
r = self.getTrackalertMetrics()
self.assertTrue(r)
self.assertEqual(r.status_code, 200)
return self.parsePrometheusResponse(r.text)
def checkWforceValues(self, values):
for key in ['wforce_worker_queue_duration_seconds_bucket{le="0.001"}', 'wforce_worker_response_duration_seconds_bucket{le="0.001"}', 'wforce_commands_total{cmd="ping"}', 'wforce_custom_stats_total{metric="customStat"}', 'wforce_allow_status_total{status="denied"}', 'wforce_replication_sent_total{sibling="127.0.0.1:4001",status="ok"}', 'wforce_replication_rcvd_total{sibling="127.0.0.1",status="ok"}', 'wforce_replication_tcp_connfailed_total{sibling="127.0.0.1:4001"}', 'wforce_redis_wlbl_updates_total{type="bl"}', 'wforce_redis_wlbl_connfailed_total{type="wl"}', 'wforce_bl_entries{type="iplogin"}', 'wforce_wl_entries{type="ip"}', 'wforce_web_queue_size', 'wforce_webhook_queue_size']:
self.assertIn(key, values)
self.assertGreaterEqual(float(values[key]), 0)
def checkTrackalertValues(self, values):
for key in ['trackalert_worker_queue_duration_seconds_bucket{le="0.001"}', 'trackalert_worker_response_duration_seconds_bucket{le="0.001"}', 'trackalert_active_http_connections', 'trackalert_commands_total{cmd="report"}']:
self.assertIn(key, values)
self.assertGreaterEqual(float(values[key]), 0)
def test_WforceMetrics(self):
values = self.getWforcePrometheusValues()
self.checkWforceValues(values)
allow_count = float(values['wforce_commands_total{cmd="allow"}'])
repl_rcvd_count = float(values['wforce_replication_rcvd_total{sibling="127.0.0.1",status="ok"}'])
# add one message before checking stats again
self.allowFunc("foobar", "99.22.33.11", "1234")
self.reportFuncReplica("foobar", "99.22.33.11", "1234", False)
time.sleep(1)
values = self.getWforcePrometheusValues()
self.assertGreater(float(values['wforce_commands_total{cmd="allow"}']), allow_count)
self.assertGreater(float(values['wforce_worker_response_duration_seconds_bucket{le="+Inf"}']), 0)
self.assertGreater(float(values['wforce_replication_rcvd_total{sibling="127.0.0.1",status="ok"}']), repl_rcvd_count)
def test_TrackalertMetrics(self):
values = self.getTrackalertPrometheusValues()
self.checkTrackalertValues(values)
custom_count = float(values['trackalert_commands_total{cmd="custom"}'])
# add one message before checking stats again
self.trackalertCustomFunc("foobar")
values = self.getTrackalertPrometheusValues()
self.assertGreater(float(values['trackalert_commands_total{cmd="custom"}']), custom_count)
self.assertGreater(float(values['trackalert_worker_response_duration_seconds_bucket{le="+Inf"}']), 0)
def test_RealPrometheus(self):
cmd = ["prometheus", "--config.file=%s" % PROMETHEUS_CONF]
prometheus = subprocess.Popen(cmd, close_fds=True)
prometheus_tries = 20
try:
for i in range(prometheus_tries+1):
try:
r = requests.get('%s/api/v1/label/__name__/values' %
PROMETHEUS_URL)
assert 'wforce_commands_total' in r.text, r.text
break
except:
if i == prometheus_tries:
raise
time.sleep(4)
finally:
prometheus.terminate()
prometheus.wait()
|