1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
from __future__ import unicode_literals
import sys
import threading
if sys.version_info < (2, 7):
# We need the skip decorators from unittest2 on Python 2.6.
import unittest2 as unittest
else:
import unittest
from prometheus_client import Gauge, Counter, Summary, Histogram, Metric
from prometheus_client import CollectorRegistry, generate_latest
from prometheus_client import push_to_gateway, pushadd_to_gateway, delete_from_gateway
from prometheus_client import CONTENT_TYPE_LATEST, instance_ip_grouping_key
try:
from BaseHTTPServer import BaseHTTPRequestHandler
from BaseHTTPServer import HTTPServer
except ImportError:
# Python 3
from http.server import BaseHTTPRequestHandler
from http.server import HTTPServer
class TestGenerateText(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
def test_counter(self):
c = Counter('cc', 'A counter', registry=self.registry)
c.inc()
self.assertEqual(b'# HELP cc A counter\n# TYPE cc counter\ncc 1.0\n', generate_latest(self.registry))
def test_gauge(self):
g = Gauge('gg', 'A gauge', registry=self.registry)
g.set(17)
self.assertEqual(b'# HELP gg A gauge\n# TYPE gg gauge\ngg 17.0\n', generate_latest(self.registry))
def test_summary(self):
s = Summary('ss', 'A summary', ['a', 'b'], registry=self.registry)
s.labels('c', 'd').observe(17)
self.assertEqual(b'# HELP ss A summary\n# TYPE ss summary\nss_count{a="c",b="d"} 1.0\nss_sum{a="c",b="d"} 17.0\n', generate_latest(self.registry))
@unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.")
def test_histogram(self):
s = Histogram('hh', 'A histogram', registry=self.registry)
s.observe(0.05)
self.assertEqual(b'''# HELP hh A histogram
# TYPE hh histogram
hh_bucket{le="0.005"} 0.0
hh_bucket{le="0.01"} 0.0
hh_bucket{le="0.025"} 0.0
hh_bucket{le="0.05"} 1.0
hh_bucket{le="0.075"} 1.0
hh_bucket{le="0.1"} 1.0
hh_bucket{le="0.25"} 1.0
hh_bucket{le="0.5"} 1.0
hh_bucket{le="0.75"} 1.0
hh_bucket{le="1.0"} 1.0
hh_bucket{le="2.5"} 1.0
hh_bucket{le="5.0"} 1.0
hh_bucket{le="7.5"} 1.0
hh_bucket{le="10.0"} 1.0
hh_bucket{le="+Inf"} 1.0
hh_count 1.0
hh_sum 0.05
''', generate_latest(self.registry))
def test_unicode(self):
c = Counter('cc', '\u4500', ['l'], registry=self.registry)
c.labels('\u4500').inc()
self.assertEqual(b'# HELP cc \xe4\x94\x80\n# TYPE cc counter\ncc{l="\xe4\x94\x80"} 1.0\n', generate_latest(self.registry))
def test_escaping(self):
c = Counter('cc', 'A\ncount\\er', ['a'], registry=self.registry)
c.labels('\\x\n"').inc(1)
self.assertEqual(b'# HELP cc A\\ncount\\\\er\n# TYPE cc counter\ncc{a="\\\\x\\n\\""} 1.0\n', generate_latest(self.registry))
def test_nonnumber(self):
class MyNumber():
def __repr__(self):
return "MyNumber(123)"
def __float__(self):
return 123.0
class MyCollector():
def collect(self):
metric = Metric("nonnumber", "Non number", 'untyped')
metric.add_sample("nonnumber", {}, MyNumber())
yield metric
self.registry.register(MyCollector())
self.assertEqual(b'# HELP nonnumber Non number\n# TYPE nonnumber untyped\nnonnumber 123.0\n', generate_latest(self.registry))
class TestPushGateway(unittest.TestCase):
def setUp(self):
self.registry = CollectorRegistry()
self.counter = Gauge('g', 'help', registry=self.registry)
self.requests = requests = []
class TestHandler(BaseHTTPRequestHandler):
def do_PUT(self):
self.send_response(201)
length = int(self.headers['content-length'])
requests.append((self, self.rfile.read(length)))
self.end_headers()
do_POST = do_PUT
do_DELETE = do_PUT
httpd = HTTPServer(('localhost', 0), TestHandler)
self.address = ':'.join([str(x) for x in httpd.server_address])
class TestServer(threading.Thread):
def run(self):
httpd.handle_request()
self.server = TestServer()
self.server.daemon = True
self.server.start()
def test_push(self):
push_to_gateway(self.address, "my_job", self.registry)
self.assertEqual(self.requests[0][0].command, 'PUT')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
def test_push_with_groupingkey(self):
push_to_gateway(self.address, "my_job", self.registry, {'a': 9})
self.assertEqual(self.requests[0][0].command, 'PUT')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job/a/9')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
def test_push_with_complex_groupingkey(self):
push_to_gateway(self.address, "my_job", self.registry, {'a': 9, 'b': 'a/ z'})
self.assertEqual(self.requests[0][0].command, 'PUT')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job/a/9/b/a%2F+z')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
def test_pushadd(self):
pushadd_to_gateway(self.address, "my_job", self.registry)
self.assertEqual(self.requests[0][0].command, 'POST')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
def test_pushadd_with_groupingkey(self):
pushadd_to_gateway(self.address, "my_job", self.registry, {'a': 9})
self.assertEqual(self.requests[0][0].command, 'POST')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job/a/9')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
def test_delete(self):
delete_from_gateway(self.address, "my_job")
self.assertEqual(self.requests[0][0].command, 'DELETE')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'')
def test_delete_with_groupingkey(self):
delete_from_gateway(self.address, "my_job", {'a': 9})
self.assertEqual(self.requests[0][0].command, 'DELETE')
self.assertEqual(self.requests[0][0].path, '/metrics/job/my_job/a/9')
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
self.assertEqual(self.requests[0][1], b'')
@unittest.skipIf(
sys.platform == "darwin",
"instance_ip_grouping_key() does not work on macOS."
)
def test_instance_ip_grouping_key(self):
self.assertTrue('' != instance_ip_grouping_key()['instance'])
if __name__ == '__main__':
unittest.main()
|