1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
import logging
from typing import TYPE_CHECKING, Any
from celery.signals import before_task_publish, task_postrun, task_prerun
from django_guid import clear_guid, get_guid, set_guid
from django_guid.config import settings
from django_guid.integrations.celery.context import celery_current, celery_parent
from django_guid.utils import generate_guid
if TYPE_CHECKING:
from celery import Task
logger = logging.getLogger('django_guid.celery')
parent_header = 'CELERY_PARENT_ID'
def set_transaction_id(guid: str) -> None:
"""
Sets the Sentry transaction ID if the Celery sentry integration setting is True.
"""
if settings.integration_settings.celery.sentry_integration:
import sentry_sdk
from packaging import version
if version.parse(sentry_sdk.VERSION) >= version.parse('2.12.0'):
with sentry_sdk.isolation_scope() as scope:
scope.set_tag('transaction_id', guid)
else:
with sentry_sdk.configure_scope() as scope:
scope.set_tag('transaction_id', guid)
logger.debug('Setting Sentry transaction_id to %s', guid)
@before_task_publish.connect
def publish_task_from_worker_or_request(headers: dict, **kwargs: Any) -> None:
"""
Called when a request or celery worker publishes a task to the worker pool
by calling task.delay(), task.apply_async() or using another equivalent method.
This is where we transfer state from a parent process to a child process.
"""
guid = get_guid()
logger.info('Setting task request header as %s', guid)
headers[settings.guid_header_name] = guid
if settings.integration_settings.celery.log_parent:
current = celery_current.get()
if current:
headers[parent_header] = current
@task_prerun.connect
def worker_prerun(task: 'Task', **kwargs: Any) -> None:
"""
Called before a worker starts executing a task.
Here we make sure to set the appropriate correlation ID for all logs logged
during the tasks, and on the thread in general. In that regard, this does
the Celery equivalent to what the django-guid middleware does for a request.
"""
guid = task.request.get(settings.guid_header_name)
if guid:
logger.info('Setting GUID %s', guid)
set_guid(guid)
set_transaction_id(guid)
else:
generated_guid = generate_guid(uuid_length=settings.integration_settings.celery.uuid_length)
logger.info('Generated GUID %s', generated_guid)
set_guid(generated_guid)
set_transaction_id(generated_guid)
if settings.integration_settings.celery.log_parent:
origin = task.request.get(parent_header)
if origin:
logger.info('Setting parent ID %s', origin)
celery_parent.set(origin)
generated_current_guid = generate_guid(uuid_length=settings.integration_settings.celery.uuid_length)
logger.info('Generated current ID %s', generated_current_guid)
celery_current.set(generated_current_guid)
@task_postrun.connect
def clean_up(task: 'Task', **kwargs: Any) -> None:
"""
Called after a task is finished.
Here we make sure to clean up the IDs we set in the pre-run method, so that
the next task executed by the same worker doesn't inherit the same IDs.
"""
logger.debug('Cleaning up GUIDs')
clear_guid()
if settings.integration_settings.celery.log_parent:
celery_current.set(None)
celery_parent.set(None)
|