File: celery.py

package info (click to toggle)
python-django-structlog 9.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,004 kB
  • sloc: python: 3,509; sh: 206; javascript: 79; makefile: 19
file content (115 lines) | stat: -rw-r--r-- 3,225 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import logging
import logging.config
import os

import structlog
from celery import Celery, shared_task, signals
from django.apps import AppConfig, apps
from django.conf import settings

from django_structlog.celery.steps import DjangoStructLogInitStep

if not settings.configured:
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault(
        "DJANGO_SETTINGS_MODULE", "config.settings.local"
    )  # pragma: no cover


app = Celery("django_structlog_demo_project", namespace="CELERY")

app.config_from_object("django.conf:settings")

# A step to initialize django-structlog
app.steps["worker"].add(DjangoStructLogInitStep)


@signals.setup_logging.connect
def receiver_setup_logging(
    loglevel, logfile, format, colorize, **kwargs
):  # pragma: no cover
    logging.config.dictConfig(settings.LOGGING)

    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            structlog.stdlib.filter_by_level,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.UnicodeDecoder(),
            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )


class CeleryAppConfig(AppConfig):
    name = "django_structlog_demo_project.taskapp"
    verbose_name = "Celery Config"

    def ready(self):
        installed_apps = [app_config.name for app_config in apps.get_app_configs()]
        app.autodiscover_tasks(lambda: installed_apps, force=True)


@shared_task
def successful_task(foo=None):
    import structlog

    logger = structlog.getLogger(__name__)
    logger.info("This is a successful task")


@shared_task
def failing_task(foo=None, **kwargs):
    raise Exception("This is a failed task")


@shared_task
def nesting_task():
    logger = structlog.getLogger(__name__)
    structlog.contextvars.bind_contextvars(foo="Bar")
    logger.info("This is a nesting task")

    nested_task.delay()


@shared_task
def nested_task():
    logger = structlog.getLogger(__name__)
    logger.info("This is a nested task")


@shared_task
def scheduled_task():
    logger = structlog.getLogger(__name__)
    logger.info("This is a scheduled task")


@shared_task
def rejected_task():
    pass


if not settings.IS_WORKER:  # pragma: no branch

    @shared_task
    def unknown_task():
        """Simulate a task unavailable in the worker for demonstration purpose"""


@signals.before_task_publish.connect
def corrupt_rejected_task(sender=None, headers=None, body=None, **kwargs):
    """Simulate celery's task rejection mechanism by breaking up the message"""
    logger = structlog.getLogger(__name__)
    if headers.get("task") == f"{rejected_task.__module__}.{rejected_task.__name__}":
        logger.warn(
            f"corrupting {rejected_task.__name__}",
            task_id=headers.get("id"),
        )
        del headers["task"]