1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
|
import random
from collections import Counter
from unittest import mock
import sentry_sdk
from sentry_sdk.transport import Transport
class HealthyTestTransport(Transport):
def capture_envelope(self, _):
pass
def is_healthy(self):
return True
class UnhealthyTestTransport(HealthyTestTransport):
def is_healthy(self):
return False
def test_no_monitor_if_disabled(sentry_init):
sentry_init(
transport=HealthyTestTransport(),
enable_backpressure_handling=False,
)
assert sentry_sdk.get_client().monitor is None
def test_monitor_if_enabled(sentry_init):
sentry_init(transport=HealthyTestTransport())
monitor = sentry_sdk.get_client().monitor
assert monitor is not None
assert monitor._thread is None
assert monitor.is_healthy() is True
assert monitor.downsample_factor == 0
assert monitor._thread is not None
assert monitor._thread.name == "sentry.monitor"
def test_monitor_unhealthy(sentry_init):
sentry_init(transport=UnhealthyTestTransport())
monitor = sentry_sdk.get_client().monitor
monitor.interval = 0.1
assert monitor.is_healthy() is True
for i in range(15):
monitor.run()
assert monitor.is_healthy() is False
assert monitor.downsample_factor == (i + 1 if i < 10 else 10)
def test_transaction_uses_downsampled_rate(
sentry_init, capture_record_lost_event_calls, monkeypatch
):
sentry_init(
traces_sample_rate=1.0,
transport=UnhealthyTestTransport(),
)
record_lost_event_calls = capture_record_lost_event_calls()
monitor = sentry_sdk.get_client().monitor
monitor.interval = 0.1
# make sure rng doesn't sample
monkeypatch.setattr(random, "random", lambda: 0.9)
assert monitor.is_healthy() is True
monitor.run()
assert monitor.is_healthy() is False
assert monitor.downsample_factor == 1
with sentry_sdk.start_transaction(name="foobar") as transaction:
assert transaction.sampled is False
assert transaction.sample_rate == 0.5
assert Counter(record_lost_event_calls) == Counter(
[
("backpressure", "transaction", None, 1),
("backpressure", "span", None, 1),
]
)
def test_monitor_no_thread_on_shutdown_no_errors(sentry_init):
sentry_init(transport=HealthyTestTransport())
# make it seem like the interpreter is shutting down
with mock.patch(
"threading.Thread.start",
side_effect=RuntimeError("can't create new thread at interpreter shutdown"),
):
monitor = sentry_sdk.get_client().monitor
assert monitor is not None
assert monitor._thread is None
monitor.run()
assert monitor._thread is None
|