File: steps.py

package info (click to toggle)
python-django-structlog 9.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,004 kB
  • sloc: python: 3,509; sh: 206; javascript: 79; makefile: 19
file content (22 lines) | stat: -rw-r--r-- 641 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from typing import Any

from celery import bootsteps

from .receivers import CeleryReceiver


class DjangoStructLogInitStep(bootsteps.Step):
    """``celery`` worker boot step to initialize ``django_structlog``.

    >>> from celery import Celery
    >>> from django_structlog.celery.steps import DjangoStructLogInitStep
    >>>
    >>> app = Celery("django_structlog_demo_project")
    >>> app.steps['worker'].add(DjangoStructLogInitStep)

    """

    def __init__(self, parent: Any, **kwargs: Any) -> None:
        super().__init__(parent, **kwargs)
        self.receiver = CeleryReceiver()
        self.receiver.connect_worker_signals()