1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
|
import copy
from prometheus_client import REGISTRY
METRIC_EQUALS_ERR_EXPLANATION = """
%s%s = %s, expected %s.
The values for %s are:
%s"""
METRIC_DIFF_ERR_EXPLANATION = """
%s%s changed by %f, expected %f.
Value before: %s
Value after: %s
"""
METRIC_COMPARE_ERR_EXPLANATION = """
The change in value of %s%s didn't match the predicate.
Value before: %s
Value after: %s
"""
METRIC_DIFF_ERR_NONE_EXPLANATION = """
%s%s was None after.
Value before: %s
Value after: %s
"""
"""A collection of utilities that make it easier to write test cases
that interact with metrics.
"""
def assert_metric_equal(expected_value, metric_name, registry=REGISTRY, **labels):
"""Asserts that metric_name{**labels} == expected_value."""
value = get_metric(metric_name, registry=registry, **labels)
assert_err = METRIC_EQUALS_ERR_EXPLANATION % (
metric_name,
format_labels(labels),
value,
expected_value,
metric_name,
format_vector(get_metrics_vector(metric_name)),
)
assert expected_value == value, assert_err
def assert_metric_diff(frozen_registry, expected_diff, metric_name, registry=REGISTRY, **labels):
"""Asserts that metric_name{**labels} changed by expected_diff between
the frozen registry and now. A frozen registry can be obtained
by calling save_registry, typically at the beginning of a test
case.
"""
saved_value = get_metric_from_frozen_registry(metric_name, frozen_registry, **labels)
current_value = get_metric(metric_name, registry=registry, **labels)
assert current_value is not None, METRIC_DIFF_ERR_NONE_EXPLANATION % (
metric_name,
format_labels(labels),
saved_value,
current_value,
)
diff = current_value - (saved_value or 0.0)
assert_err = METRIC_DIFF_ERR_EXPLANATION % (
metric_name,
format_labels(labels),
diff,
expected_diff,
saved_value,
current_value,
)
assert expected_diff == diff, assert_err
def assert_metric_no_diff(frozen_registry, expected_diff, metric_name, registry=REGISTRY, **labels):
"""Asserts that metric_name{**labels} isn't changed by expected_diff between
the frozen registry and now. A frozen registry can be obtained
by calling save_registry, typically at the beginning of a test
case.
"""
saved_value = get_metric_from_frozen_registry(metric_name, frozen_registry, **labels)
current_value = get_metric(metric_name, registry=registry, **labels)
assert current_value is not None, METRIC_DIFF_ERR_NONE_EXPLANATION % (
metric_name,
format_labels(labels),
saved_value,
current_value,
)
diff = current_value - (saved_value or 0.0)
assert_err = METRIC_DIFF_ERR_EXPLANATION % (
metric_name,
format_labels(labels),
diff,
expected_diff,
saved_value,
current_value,
)
assert expected_diff != diff, assert_err
def assert_metric_not_equal(expected_value, metric_name, registry=REGISTRY, **labels):
"""Asserts that metric_name{**labels} == expected_value."""
value = get_metric(metric_name, registry=registry, **labels)
assert_err = METRIC_EQUALS_ERR_EXPLANATION % (
metric_name,
format_labels(labels),
value,
expected_value,
metric_name,
format_vector(get_metrics_vector(metric_name)),
)
assert expected_value != value, assert_err
def assert_metric_compare(frozen_registry, predicate, metric_name, registry=REGISTRY, **labels):
"""Asserts that metric_name{**labels} changed according to a provided
predicate function between the frozen registry and now. A
frozen registry can be obtained by calling save_registry,
typically at the beginning of a test case.
"""
saved_value = get_metric_from_frozen_registry(metric_name, frozen_registry, **labels)
current_value = get_metric(metric_name, registry=registry, **labels)
assert current_value is not None, METRIC_DIFF_ERR_NONE_EXPLANATION % (
metric_name,
format_labels(labels),
saved_value,
current_value,
)
assert predicate(saved_value, current_value) is True, METRIC_COMPARE_ERR_EXPLANATION % (
metric_name,
format_labels(labels),
saved_value,
current_value,
)
def save_registry(registry=REGISTRY):
"""Freezes a registry. This lets a user test changes to a metric
instead of testing the absolute value. A typical use case looks like:
registry = save_registry()
doStuff()
assert_metric_diff(registry, 1, 'stuff_done_total')
"""
return copy.deepcopy(list(registry.collect()))
def get_metric(metric_name, registry=REGISTRY, **labels):
"""Gets a single metric."""
return get_metric_from_frozen_registry(metric_name, registry.collect(), **labels)
def get_metrics_vector(metric_name, registry=REGISTRY):
"""Returns the values for all labels of a given metric.
The result is returned as a list of (labels, value) tuples,
where `labels` is a dict.
This is quite a hack since it relies on the internal
representation of the prometheus_client, and it should
probably be provided as a function there instead.
"""
return get_metric_vector_from_frozen_registry(metric_name, registry.collect())
def get_metric_vector_from_frozen_registry(metric_name, frozen_registry):
"""Like get_metrics_vector, but from a frozen registry."""
output = []
for metric in frozen_registry:
for sample in metric.samples:
if sample[0] == metric_name:
output.append((sample[1], sample[2]))
return output
def get_metric_from_frozen_registry(metric_name, frozen_registry, **labels):
"""Gets a single metric from a frozen registry."""
for metric in frozen_registry:
for sample in metric.samples:
if sample[0] == metric_name and sample[1] == labels:
return sample[2]
def format_labels(labels):
"""Format a set of labels to Prometheus representation.
In:
{'method': 'GET', 'port': '80'}
Out:
'{method="GET",port="80"}'
"""
return "{{{}}}".format(",".join([f'{k}="{v}"' for k, v in labels.items()]))
def format_vector(vector):
"""Formats a list of (labels, value) where labels is a dict into a
human-readable representation.
"""
return "\n".join([f"{format_labels(labels)} = {value}" for labels, value in vector])
|