File: integration.py

package info (click to toggle)
python-django-guid 3.5.2-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 664 kB
  • sloc: python: 1,267; makefile: 16
file content (54 lines) | stat: -rw-r--r-- 2,119 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import logging
from typing import Any

from django_guid.integrations import Integration

logger = logging.getLogger('django_guid')


class CeleryIntegration(Integration):
    """
    Passes correlation IDs from parent processes to child processes in a Celery context.

    This means a correlation ID can be transferred from a request to a worker, or from a worker to another worker.

    For workers executing scheduled tasks, a correlation ID is generated for each new task.
    """

    identifier = 'CeleryIntegration'

    def __init__(
        self,
        use_django_logging: bool = False,
        log_parent: bool = False,
        uuid_length: int = 32,
        sentry_integration: bool = False,
    ) -> None:
        """
        :param use_django_logging: If true, configures Celery to use the logging settings defined in settings.py
        :param log_parent: If true, traces the origin of a task. Should be True if you wish to use the CeleryTracing log filter.
        :param uuid_length: Optionally lets you set the length of the celery IDs generated for the log filter
        """
        super().__init__()
        self.log_parent = log_parent
        self.use_django_logging = use_django_logging
        self.uuid_length = uuid_length
        self.sentry_integration = sentry_integration

    def setup(self) -> None:
        """
        Loads Celery signals.
        """
        # Import pre-configured Celery signals that will pass on the correlation ID to a celery worker
        # or will generate a correlation ID when a worker starts a scheduled task
        from django_guid.integrations.celery.signals import before_task_publish, task_postrun, task_prerun  # noqa

        if self.use_django_logging:
            # Import pre-configured Celery signals that makes Celery adopt the settings.py log config
            from django_guid.integrations.celery.logging import config_loggers  # noqa

    def run(self, guid: str, **kwargs: Any) -> None:
        """
        Does nothing, as all we need for Celery tracing is to register signals during setup.
        """
        pass  # pragma: no cover