1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
|
# Copyright (c) 2016-2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import random
from threading import Lock
from tornado.ioloop import PeriodicCallback
from .constants import (
_max_id_bits,
DEFAULT_SAMPLING_INTERVAL,
SAMPLER_TYPE_CONST,
SAMPLER_TYPE_PROBABILISTIC,
SAMPLER_TYPE_RATE_LIMITING,
SAMPLER_TYPE_LOWER_BOUND,
)
from .metrics import Metrics, LegacyMetricsFactory, MetricsFactory
from .utils import ErrorReporter
from .rate_limiter import RateLimiter
from typing import Any, Dict, Optional, Tuple
default_logger = logging.getLogger('jaeger_tracing')
SAMPLER_TYPE_TAG_KEY = 'sampler.type'
SAMPLER_PARAM_TAG_KEY = 'sampler.param'
DEFAULT_SAMPLING_PROBABILITY = 0.001
DEFAULT_LOWER_BOUND = 1.0 / (10.0 * 60.0) # sample once every 10 minutes
DEFAULT_MAX_OPERATIONS = 2000
STRATEGIES_STR = 'perOperationStrategies'
OPERATION_STR = 'operation'
DEFAULT_LOWER_BOUND_STR = 'defaultLowerBoundTracesPerSecond'
PROBABILISTIC_SAMPLING_STR = 'probabilisticSampling'
SAMPLING_RATE_STR = 'samplingRate'
DEFAULT_SAMPLING_PROBABILITY_STR = 'defaultSamplingProbability'
OPERATION_SAMPLING_STR = 'operationSampling'
MAX_TRACES_PER_SECOND_STR = 'maxTracesPerSecond'
RATE_LIMITING_SAMPLING_STR = 'rateLimitingSampling'
STRATEGY_TYPE_STR = 'strategyType'
PROBABILISTIC_SAMPLING_STRATEGY = 'PROBABILISTIC'
RATE_LIMITING_SAMPLING_STRATEGY = 'RATE_LIMITING'
_TagsType = Dict[str, Any]
_IsSampledType = Tuple[bool, _TagsType]
class Sampler(object):
"""
Sampler is responsible for deciding if a particular span should be
"sampled", i.e. recorded in permanent storage.
"""
def __init__(self, tags: Optional[_TagsType] = None) -> None:
self._tags = tags or {}
def is_sampled(self, trace_id: int, operation: str = '') -> _IsSampledType:
raise NotImplementedError()
def close(self) -> None:
raise NotImplementedError()
def __eq__(self, other: Any) -> bool:
return (
isinstance(other, self.__class__) and self.__dict__ == other.__dict__
)
def __ne__(self, other: Any) -> bool:
return not self.__eq__(other)
class ConstSampler(Sampler):
"""ConstSampler always returns the same decision."""
def __init__(self, decision: bool) -> None:
super(ConstSampler, self).__init__(
tags={
SAMPLER_TYPE_TAG_KEY: SAMPLER_TYPE_CONST,
SAMPLER_PARAM_TAG_KEY: decision,
}
)
self.decision = decision
def is_sampled(self, trace_id: int, operation: str = '') -> _IsSampledType:
return self.decision, self._tags
def close(self):
pass
def __str__(self) -> str:
return 'ConstSampler(%s)' % self.decision
class ProbabilisticSampler(Sampler):
"""
A sampler that randomly samples a certain percentage of traces specified
by the samplingRate, in the range between 0.0 and 1.0.
It relies on the fact that new trace IDs are 64bit random numbers
themselves, thus making the sampling decision without generating a new
random number, but simply calculating if traceID < (samplingRate * 2^64).
Note that we actually ignore (zero out) the most significant bit.
"""
def __init__(self, rate: float) -> None:
super(ProbabilisticSampler, self).__init__(
tags={
SAMPLER_TYPE_TAG_KEY: SAMPLER_TYPE_PROBABILISTIC,
SAMPLER_PARAM_TAG_KEY: rate,
}
)
assert 0.0 <= rate <= 1.0, 'Sampling rate must be between 0.0 and 1.0'
self.rate = rate
self.max_number = 1 << _max_id_bits
self.boundary = rate * self.max_number
def is_sampled(self, trace_id: int, operation: str = '') -> _IsSampledType:
trace_id = trace_id & (self.max_number - 1)
return trace_id < self.boundary, self._tags
def close(self) -> None:
pass
def __str__(self) -> str:
return 'ProbabilisticSampler(%s)' % self.rate
class RateLimitingSampler(Sampler):
"""
Samples at most max_traces_per_second. The distribution of sampled
traces follows burstiness of the service, i.e. a service with uniformly
distributed requests will have those requests sampled uniformly as well,
but if requests are bursty, especially sub-second, then a number of
sequential requests can be sampled each second.
"""
def __init__(self, max_traces_per_second: float = 10) -> None:
super(RateLimitingSampler, self).__init__()
self.rate_limiter: RateLimiter = None # type:ignore # value is set below
self._init(max_traces_per_second)
def _init(self, max_traces_per_second):
assert max_traces_per_second >= 0, \
'max_traces_per_second must not be negative'
self._tags = {
SAMPLER_TYPE_TAG_KEY: SAMPLER_TYPE_RATE_LIMITING,
SAMPLER_PARAM_TAG_KEY: max_traces_per_second,
}
self.traces_per_second = max_traces_per_second
max_balance = max(self.traces_per_second, 1.0)
if not self.rate_limiter:
self.rate_limiter = RateLimiter(
credits_per_second=self.traces_per_second,
max_balance=max_balance
)
else:
self.rate_limiter.update(max_traces_per_second, max_balance)
def is_sampled(self, trace_id: int, operation: str = '') -> _IsSampledType:
return self.rate_limiter.check_credit(1.0), self._tags
def close(self) -> None:
pass
def __eq__(self, other: Any) -> bool:
"""The last_tick and balance fields can be different"""
if not isinstance(other, self.__class__):
return False
d1 = dict(self.rate_limiter.__dict__)
d2 = dict(other.rate_limiter.__dict__)
d1['balance'] = d2['balance']
d1['last_tick'] = d2['last_tick']
return d1 == d2
def update(self, max_traces_per_second: float) -> bool:
if self.traces_per_second == max_traces_per_second:
return False
self._init(max_traces_per_second)
return True
def __str__(self) -> str:
return 'RateLimitingSampler(%s)' % self.traces_per_second
class GuaranteedThroughputProbabilisticSampler(Sampler):
"""
A sampler that leverages both ProbabilisticSampler and RateLimitingSampler.
The RateLimitingSampler is used as a guaranteed lower bound sampler such
that every operation is sampled at least once in a time interval defined by
the lower_bound. ie a lower_bound of 1.0 / (60 * 10) will sample an
operation at least once every 10 minutes.
The ProbabilisticSampler is given higher priority when tags are emitted,
ie. if is_sampled() for both samplers return true, the tags for
ProbabilisticSampler will be used.
"""
def __init__(self, operation: str, lower_bound: float, rate: float) -> None:
super(GuaranteedThroughputProbabilisticSampler, self).__init__(
tags={
SAMPLER_TYPE_TAG_KEY: SAMPLER_TYPE_LOWER_BOUND,
SAMPLER_PARAM_TAG_KEY: rate,
}
)
self.probabilistic_sampler = ProbabilisticSampler(rate)
self.lower_bound_sampler = RateLimitingSampler(lower_bound)
self.operation = operation
self.rate = rate
self.lower_bound = lower_bound
def is_sampled(self, trace_id: int, operation: str = '') -> _IsSampledType:
sampled, tags = \
self.probabilistic_sampler.is_sampled(trace_id, operation)
if sampled:
self.lower_bound_sampler.is_sampled(trace_id, operation)
return True, tags
sampled, _ = self.lower_bound_sampler.is_sampled(trace_id, operation)
return sampled, self._tags
def close(self) -> None:
self.probabilistic_sampler.close()
self.lower_bound_sampler.close()
def update(self, lower_bound: int, rate: float) -> None:
# (NB) This function should only be called while holding a Write lock.
if self.rate != rate:
self.probabilistic_sampler = ProbabilisticSampler(rate)
self.rate = rate
self._tags = {
SAMPLER_TYPE_TAG_KEY: SAMPLER_TYPE_LOWER_BOUND,
SAMPLER_PARAM_TAG_KEY: rate,
}
if self.lower_bound != lower_bound:
self.lower_bound_sampler.update(lower_bound)
self.lower_bound = lower_bound
def __str__(self) -> str:
return 'GuaranteedThroughputProbabilisticSampler(%s, %f, %f)' \
% (self.operation, self.rate, self.lower_bound)
class AdaptiveSampler(Sampler):
"""
A sampler that leverages both ProbabilisticSampler and RateLimitingSampler
via the GuaranteedThroughputProbabilisticSampler. This sampler keeps track
of all operations and delegates calls the the respective
GuaranteedThroughputProbabilisticSampler.
"""
def __init__(self, strategies: Dict[str, Any], max_operations: int) -> None:
super(AdaptiveSampler, self).__init__()
samplers = {}
for strategy in strategies.get(STRATEGIES_STR, []):
operation = strategy.get(OPERATION_STR)
sampler = GuaranteedThroughputProbabilisticSampler(
operation,
strategies.get(DEFAULT_LOWER_BOUND_STR, DEFAULT_LOWER_BOUND),
get_sampling_probability(strategy)
)
samplers[operation] = sampler
self.samplers = samplers
self.default_sampler = \
ProbabilisticSampler(strategies.get(DEFAULT_SAMPLING_PROBABILITY_STR,
DEFAULT_SAMPLING_PROBABILITY))
self.default_sampling_probability = \
strategies.get(DEFAULT_SAMPLING_PROBABILITY_STR, DEFAULT_SAMPLING_PROBABILITY)
self.lower_bound = strategies.get(DEFAULT_LOWER_BOUND_STR, DEFAULT_LOWER_BOUND)
self.max_operations = max_operations
def is_sampled(self, trace_id: int, operation: str = '') -> _IsSampledType:
sampler = self.samplers.get(operation)
if not sampler:
if len(self.samplers) >= self.max_operations:
return self.default_sampler.is_sampled(trace_id, operation)
sampler = GuaranteedThroughputProbabilisticSampler(
operation,
self.lower_bound,
self.default_sampling_probability
)
self.samplers[operation] = sampler
return sampler.is_sampled(trace_id, operation)
return sampler.is_sampled(trace_id, operation)
def update(self, strategies: Dict[str, Any]) -> None:
# (NB) This function should only be called while holding a Write lock.
for strategy in strategies.get(STRATEGIES_STR, []):
operation = strategy.get(OPERATION_STR)
lower_bound = strategies.get(DEFAULT_LOWER_BOUND_STR, DEFAULT_LOWER_BOUND)
sampling_rate = get_sampling_probability(strategy)
sampler = self.samplers.get(operation)
if not sampler:
sampler = GuaranteedThroughputProbabilisticSampler(
operation,
lower_bound,
sampling_rate
)
self.samplers[operation] = sampler
else:
sampler.update(lower_bound, sampling_rate)
self.lower_bound = strategies.get(DEFAULT_LOWER_BOUND_STR, DEFAULT_LOWER_BOUND)
if self.default_sampling_probability != strategies.get(DEFAULT_SAMPLING_PROBABILITY_STR,
DEFAULT_SAMPLING_PROBABILITY):
self.default_sampling_probability = \
strategies.get(DEFAULT_SAMPLING_PROBABILITY_STR, DEFAULT_SAMPLING_PROBABILITY)
self.default_sampler = \
ProbabilisticSampler(self.default_sampling_probability)
def close(self) -> None:
for _, sampler in self.samplers.items():
sampler.close()
def __str__(self) -> str:
return 'AdaptiveSampler(%f, %f, %d)' \
% (self.default_sampling_probability, self.lower_bound,
self.max_operations)
class RemoteControlledSampler(Sampler):
"""Periodically loads the sampling strategy from a remote server."""
def __init__(self, channel: Any, service_name: str, **kwargs: Any) -> None:
"""
:param channel: channel for communicating with jaeger-agent
:param service_name: name of this application
:param kwargs: optional parameters
- init_sampler: initial value of the sampler,
else ProbabilisticSampler(0.001)
- sampling_refresh_interval: interval in seconds for polling
for new strategy
- logger: Logger instance
- metrics: metrics facade, used to emit metrics on errors.
This parameter has been deprecated, please use
metrics_factory instead.
- metrics_factory: used to generate metrics for errors
- error_reporter: ErrorReporter instance
- max_operations: maximum number of unique operations the
AdaptiveSampler will keep track of
:param init:
:return:
"""
super(RemoteControlledSampler, self).__init__()
self._channel = channel
self.service_name = service_name
self.logger = kwargs.get('logger', default_logger)
self.sampler = kwargs.get('init_sampler')
self.sampling_refresh_interval = \
kwargs.get('sampling_refresh_interval') or DEFAULT_SAMPLING_INTERVAL
self.metrics_factory = kwargs.get('metrics_factory') \
or LegacyMetricsFactory(kwargs.get('metrics') or Metrics())
self.metrics = SamplerMetrics(self.metrics_factory)
self.error_reporter = kwargs.get('error_reporter') or \
ErrorReporter(Metrics())
self.max_operations = kwargs.get('max_operations') or \
DEFAULT_MAX_OPERATIONS
if not self.sampler:
self.sampler = ProbabilisticSampler(DEFAULT_SAMPLING_PROBABILITY)
else:
self.sampler.is_sampled(0) # assert we got valid sampler API
self.lock = Lock()
self.running = True
self.periodic = None
self.io_loop = channel.io_loop
if not self.io_loop:
self.logger.error(
'Cannot acquire IOLoop, sampler will not be updated')
else:
# according to IOLoop docs, it's not safe to use timeout methods
# unless already running in the loop, so we use `add_callback`
self.io_loop.add_callback(self._init_polling)
def is_sampled(self, trace_id: int, operation: str = '') -> _IsSampledType:
with self.lock:
assert self.sampler # needed for mypy
return self.sampler.is_sampled(trace_id, operation)
def _init_polling(self):
"""
Bootstrap polling for sampling strategy.
To avoid spiky traffic from the samplers, we use a random delay
before the first poll.
"""
with self.lock:
if not self.running:
return
r = random.Random()
delay = r.random() * self.sampling_refresh_interval
self.io_loop.call_later(delay=delay,
callback=self._delayed_polling)
self.logger.info(
'Delaying sampling strategy polling by %d sec', delay)
def _delayed_polling(self):
periodic = self._create_periodic_callback()
self._poll_sampling_manager() # Initialize sampler now
with self.lock:
if not self.running:
return
self.periodic = periodic
periodic.start() # start the periodic cycle
self.logger.info(
'Tracing sampler started with sampling refresh '
'interval %d sec', self.sampling_refresh_interval)
def _create_periodic_callback(self):
return PeriodicCallback(
callback=self._poll_sampling_manager,
# convert interval to milliseconds
callback_time=self.sampling_refresh_interval * 1000)
def _sampling_request_callback(self, future):
exception = future.exception()
if exception:
self.metrics.sampler_query_failure(1)
self.error_reporter.error(
'Fail to get sampling strategy from jaeger-agent: %s',
exception)
return
response = future.result()
# In Python 3.5 response.body is of type bytes and json.loads() does only support str
# See: https://github.com/jaegertracing/jaeger-client-python/issues/180
if hasattr(response.body, 'decode') and callable(response.body.decode):
response_body = response.body.decode('utf-8')
else:
response_body = response.body
try:
sampling_strategies_response = json.loads(response_body)
self.metrics.sampler_retrieved(1)
except Exception as e:
self.metrics.sampler_query_failure(1)
self.error_reporter.error(
'Fail to parse sampling strategy '
'from jaeger-agent: %s [%s]', e, response_body)
return
self._update_sampler(sampling_strategies_response)
self.logger.debug('Tracing sampler set to %s', self.sampler)
def _update_sampler(self, response):
with self.lock:
try:
if response.get(OPERATION_SAMPLING_STR):
self._update_adaptive_sampler(response.get(OPERATION_SAMPLING_STR))
else:
self._update_rate_limiting_or_probabilistic_sampler(response)
except Exception as e:
self.metrics.sampler_update_failure(1)
self.error_reporter.error(
'Fail to update sampler'
'from jaeger-agent: %s [%s]', e, response)
def _update_adaptive_sampler(self, per_operation_strategies):
if isinstance(self.sampler, AdaptiveSampler):
self.sampler.update(per_operation_strategies)
else:
self.sampler = AdaptiveSampler(per_operation_strategies, self.max_operations)
self.metrics.sampler_updated(1)
def _update_rate_limiting_or_probabilistic_sampler(self, response):
s_type = response.get(STRATEGY_TYPE_STR)
new_sampler = self.sampler
if s_type == PROBABILISTIC_SAMPLING_STRATEGY:
sampling_rate = get_sampling_probability(response)
new_sampler = ProbabilisticSampler(rate=sampling_rate)
elif s_type == RATE_LIMITING_SAMPLING_STRATEGY:
mtps = get_rate_limit(response)
if mtps < 0 or mtps >= 500:
raise ValueError(
'Rate limiting parameter not in [0, 500) range: %s' % mtps)
if isinstance(self.sampler, RateLimitingSampler):
if self.sampler.update(max_traces_per_second=mtps):
self.metrics.sampler_updated(1)
else:
new_sampler = RateLimitingSampler(max_traces_per_second=mtps)
else:
raise ValueError('Unsupported sampling strategy type: %s' % s_type)
if self.sampler != new_sampler:
self.sampler = new_sampler
self.metrics.sampler_updated(1)
def _poll_sampling_manager(self):
self.logger.debug('Requesting tracing sampler refresh')
fut = self._channel.request_sampling_strategy(self.service_name)
fut.add_done_callback(self._sampling_request_callback)
def close(self) -> None:
with self.lock:
self.running = False
if self.periodic:
self.periodic.stop()
def get_sampling_probability(strategy: Optional[Dict[str, Any]] = None) -> float:
if not strategy:
return DEFAULT_SAMPLING_PROBABILITY
probability_strategy = strategy.get(PROBABILISTIC_SAMPLING_STR)
if not probability_strategy:
return DEFAULT_SAMPLING_PROBABILITY
return probability_strategy.get(SAMPLING_RATE_STR, DEFAULT_SAMPLING_PROBABILITY)
def get_rate_limit(strategy: Optional[Dict[str, Any]] = None) -> float:
if not strategy:
return DEFAULT_LOWER_BOUND
rate_limit_strategy = strategy.get(RATE_LIMITING_SAMPLING_STR)
if not rate_limit_strategy:
return DEFAULT_LOWER_BOUND
return rate_limit_strategy.get(MAX_TRACES_PER_SECOND_STR, DEFAULT_LOWER_BOUND)
class SamplerMetrics(object):
"""Sampler specific metrics."""
def __init__(self, metrics_factory: MetricsFactory) -> None:
self.sampler_retrieved = \
metrics_factory.create_counter(name='jaeger:sampler_queries', tags={'result': 'ok'})
self.sampler_query_failure = \
metrics_factory.create_counter(name='jaeger:sampler_queries', tags={'result': 'err'})
self.sampler_updated = \
metrics_factory.create_counter(name='jaeger:sampler_updates', tags={'result': 'ok'})
self.sampler_update_failure = \
metrics_factory.create_counter(name='jaeger:sampler_updates', tags={'result': 'err'})
|