1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
import weakref
import sentry_sdk
from sentry_sdk.consts import OP
from sentry_sdk.api import continue_trace
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK
from sentry_sdk.utils import (
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
format_timestamp,
parse_version,
)
try:
from rq.queue import Queue
from rq.timeouts import JobTimeoutException
from rq.version import VERSION as RQ_VERSION
from rq.worker import Worker
from rq.job import JobStatus
except ImportError:
raise DidNotEnable("RQ not installed")
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, Callable
from sentry_sdk._types import Event, EventProcessor
from sentry_sdk.utils import ExcInfo
from rq.job import Job
class RqIntegration(Integration):
identifier = "rq"
origin = f"auto.queue.{identifier}"
@staticmethod
def setup_once():
# type: () -> None
version = parse_version(RQ_VERSION)
if version is None:
raise DidNotEnable("Unparsable RQ version: {}".format(RQ_VERSION))
if version < (0, 6):
raise DidNotEnable("RQ 0.6 or newer is required.")
old_perform_job = Worker.perform_job
@ensure_integration_enabled(RqIntegration, old_perform_job)
def sentry_patched_perform_job(self, job, *args, **kwargs):
# type: (Any, Job, *Queue, **Any) -> bool
with sentry_sdk.new_scope() as scope:
scope.clear_breadcrumbs()
scope.add_event_processor(_make_event_processor(weakref.ref(job)))
transaction = continue_trace(
job.meta.get("_sentry_trace_headers") or {},
op=OP.QUEUE_TASK_RQ,
name="unknown RQ task",
source=TRANSACTION_SOURCE_TASK,
origin=RqIntegration.origin,
)
with capture_internal_exceptions():
transaction.name = job.func_name
with sentry_sdk.start_transaction(
transaction,
custom_sampling_context={"rq_job": job},
):
rv = old_perform_job(self, job, *args, **kwargs)
if self.is_horse:
# We're inside of a forked process and RQ is
# about to call `os._exit`. Make sure that our
# events get sent out.
sentry_sdk.get_client().flush()
return rv
Worker.perform_job = sentry_patched_perform_job
old_handle_exception = Worker.handle_exception
def sentry_patched_handle_exception(self, job, *exc_info, **kwargs):
# type: (Worker, Any, *Any, **Any) -> Any
# Note, the order of the `or` here is important,
# because calling `job.is_failed` will change `_status`.
if job._status == JobStatus.FAILED or job.is_failed:
_capture_exception(exc_info)
return old_handle_exception(self, job, *exc_info, **kwargs)
Worker.handle_exception = sentry_patched_handle_exception
old_enqueue_job = Queue.enqueue_job
@ensure_integration_enabled(RqIntegration, old_enqueue_job)
def sentry_patched_enqueue_job(self, job, **kwargs):
# type: (Queue, Any, **Any) -> Any
scope = sentry_sdk.get_current_scope()
if scope.span is not None:
job.meta["_sentry_trace_headers"] = dict(
scope.iter_trace_propagation_headers()
)
return old_enqueue_job(self, job, **kwargs)
Queue.enqueue_job = sentry_patched_enqueue_job
ignore_logger("rq.worker")
def _make_event_processor(weak_job):
# type: (Callable[[], Job]) -> EventProcessor
def event_processor(event, hint):
# type: (Event, dict[str, Any]) -> Event
job = weak_job()
if job is not None:
with capture_internal_exceptions():
extra = event.setdefault("extra", {})
rq_job = {
"job_id": job.id,
"func": job.func_name,
"args": job.args,
"kwargs": job.kwargs,
"description": job.description,
}
if job.enqueued_at:
rq_job["enqueued_at"] = format_timestamp(job.enqueued_at)
if job.started_at:
rq_job["started_at"] = format_timestamp(job.started_at)
extra["rq-job"] = rq_job
if "exc_info" in hint:
with capture_internal_exceptions():
if issubclass(hint["exc_info"][0], JobTimeoutException):
event["fingerprint"] = ["rq", "JobTimeoutException", job.func_name]
return event
return event_processor
def _capture_exception(exc_info, **kwargs):
# type: (ExcInfo, **Any) -> None
client = sentry_sdk.get_client()
event, hint = event_from_exception(
exc_info,
client_options=client.options,
mechanism={"type": "rq", "handled": False},
)
sentry_sdk.capture_event(event, hint=hint)
|