1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
.. _celery_integration:
Celery Integration
==================
Getting Started with Celery
^^^^^^^^^^^^^^^^^^^^^^^^^^^
In order to be able to support celery you need to configure both your webapp and your workers
.. warning::
If json is used to serialize your celery tasks, the log context in use when executing a task (through ``apply_async`` or ``delay``) should only contain JSON-serializable data. You can use modify_context_before_task_publish_ to ensure this is the case.
Replace your requirements
-------------------------
First of all, make sure your ``django-structlog`` installation knows you use ``celery`` in order to validate compatibility with your installed version. See `Installing “Extras” <https://packaging.python.org/en/latest/tutorials/installing-packages/#installing-extras>`_ for more information.
Replace ``django-structlog`` with ``django-structlog[celery]`` in your ``requirements.txt``.
.. code-block:: python
django-structlog[celery]==X.Y.Z
Enable celery integration in your web app
-----------------------------------------
In your settings.py
.. code-block:: python
MIDDLEWARE = [
# ...
'django_structlog.middlewares.RequestMiddleware',
]
DJANGO_STRUCTLOG_CELERY_ENABLED = True
Initialize Celery Worker with DjangoStructLogInitStep
-----------------------------------------------------
In your celery AppConfig's module.
.. code-block:: python
import logging
import structlog
from celery import Celery
from celery.signals import setup_logging
from django_structlog.celery.steps import DjangoStructLogInitStep
app = Celery("your_celery_project")
# A step to initialize django-structlog
app.steps['worker'].add(DjangoStructLogInitStep)
.. warning::
If you use ``celery``'s `task_protocol v1 <https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-protocol>`_, ``django-structlog`` will not be able to transfer metadata to child task.
Ex:
.. code-block:: python
app = Celery("your_celery_project", task_protocol=1)
Configure celery's logger
-------------------------
In the same file as before
.. code-block:: python
@setup_logging.connect
def receiver_setup_logging(loglevel, logfile, format, colorize, **kwargs): # pragma: no cover
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json_formatter": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.processors.JSONRenderer(),
},
"plain_console": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.dev.ConsoleRenderer(),
},
"key_value": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.processors.KeyValueRenderer(key_order=['timestamp', 'level', 'event', 'logger']),
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "plain_console",
},
"json_file": {
"class": "logging.handlers.WatchedFileHandler",
"filename": "logs/json.log",
"formatter": "json_formatter",
},
"flat_line_file": {
"class": "logging.handlers.WatchedFileHandler",
"filename": "logs/flat_line.log",
"formatter": "key_value",
},
},
"loggers": {
"django_structlog": {
"handlers": ["console", "flat_line_file", "json_file"],
"level": "INFO",
},
"django_structlog_demo_project": {
"handlers": ["console", "flat_line_file", "json_file"],
"level": "INFO",
},
}
}
)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.filter_by_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
.. _celery_signals:
Signals
^^^^^^^
.. _modify_context_before_task_publish:
modify_context_before_task_publish
----------------------------------
You can connect to ``modify_context_before_task_publish`` signal in order to modify the metadata before it is stored in the task's message.
By example you can strip down the ``context`` to keep only some of the keys:
.. code-block:: python
@receiver(signals.modify_context_before_task_publish)
def receiver_modify_context_before_task_publish(sender, signal, context, task_routing_key=None, task_properties=None, **kwargs):
keys_to_keep = {"request_id", "parent_task_id"}
new_dict = {key_to_keep: context[key_to_keep] for key_to_keep in keys_to_keep if key_to_keep in context}
context.clear()
context.update(new_dict)
bind_extra_task_metadata
------------------------
You can optionally connect to ``bind_extra_task_metadata`` signal in order to bind more metadata to the logger or override existing bound metadata. This is called
in celery's ``receiver_task_pre_run``.
.. code-block:: python
from django_structlog.celery import signals
import structlog
@receiver(signals.bind_extra_task_metadata)
def receiver_bind_extra_request_metadata(sender, signal, task=None, logger=None, **kwargs):
structlog.contextvars.bind_contextvars(correlation_id=task.request.correlation_id)
|