1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
from __future__ import absolute_import
import threading
import time
from kafka.errors import QuotaViolationError
from kafka.metrics import KafkaMetric
class Sensor(object):
"""
A sensor applies a continuous sequence of numerical values
to a set of associated metrics. For example a sensor on
message size would record a sequence of message sizes using
the `record(double)` api and would maintain a set
of metrics about request sizes such as the average or max.
"""
def __init__(self, registry, name, parents, config,
inactive_sensor_expiration_time_seconds):
if not name:
raise ValueError('name must be non-empty')
self._lock = threading.RLock()
self._registry = registry
self._name = name
self._parents = parents or []
self._metrics = []
self._stats = []
self._config = config
self._inactive_sensor_expiration_time_ms = (
inactive_sensor_expiration_time_seconds * 1000)
self._last_record_time = time.time() * 1000
self._check_forest(set())
def _check_forest(self, sensors):
"""Validate that this sensor doesn't end up referencing itself."""
if self in sensors:
raise ValueError('Circular dependency in sensors: %s is its own'
'parent.' % (self.name,))
sensors.add(self)
for parent in self._parents:
parent._check_forest(sensors)
@property
def name(self):
"""
The name this sensor is registered with.
This name will be unique among all registered sensors.
"""
return self._name
@property
def metrics(self):
return tuple(self._metrics)
def record(self, value=1.0, time_ms=None):
"""
Record a value at a known time.
Arguments:
value (double): The value we are recording
time_ms (int): A POSIX timestamp in milliseconds.
Default: The time when record() is evaluated (now)
Raises:
QuotaViolationException: if recording this value moves a
metric beyond its configured maximum or minimum bound
"""
if time_ms is None:
time_ms = time.time() * 1000
self._last_record_time = time_ms
with self._lock: # XXX high volume, might be performance issue
# increment all the stats
for stat in self._stats:
stat.record(self._config, value, time_ms)
self._check_quotas(time_ms)
for parent in self._parents:
parent.record(value, time_ms)
def _check_quotas(self, time_ms):
"""
Check if we have violated our quota for any metric that
has a configured quota
"""
for metric in self._metrics:
if metric.config and metric.config.quota:
value = metric.value(time_ms)
if not metric.config.quota.is_acceptable(value):
raise QuotaViolationError("'%s' violated quota. Actual: "
"%d, Threshold: %d" %
(metric.metric_name,
value,
metric.config.quota.bound))
def add_compound(self, compound_stat, config=None):
"""
Register a compound statistic with this sensor which
yields multiple measurable quantities (like a histogram)
Arguments:
stat (AbstractCompoundStat): The stat to register
config (MetricConfig): The configuration for this stat.
If None then the stat will use the default configuration
for this sensor.
"""
if not compound_stat:
raise ValueError('compound stat must be non-empty')
self._stats.append(compound_stat)
for named_measurable in compound_stat.stats():
metric = KafkaMetric(named_measurable.name, named_measurable.stat,
config or self._config)
self._registry.register_metric(metric)
self._metrics.append(metric)
def add(self, metric_name, stat, config=None):
"""
Register a metric with this sensor
Arguments:
metric_name (MetricName): The name of the metric
stat (AbstractMeasurableStat): The statistic to keep
config (MetricConfig): A special configuration for this metric.
If None use the sensor default configuration.
"""
with self._lock:
metric = KafkaMetric(metric_name, stat, config or self._config)
self._registry.register_metric(metric)
self._metrics.append(metric)
self._stats.append(stat)
def has_expired(self):
"""
Return True if the Sensor is eligible for removal due to inactivity.
"""
return ((time.time() * 1000 - self._last_record_time) >
self._inactive_sensor_expiration_time_ms)
|