File: myapp.py

package info (click to toggle)
celery 5.6.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,376 kB
  • sloc: python: 67,264; sh: 795; makefile: 378
file content (51 lines) | stat: -rw-r--r-- 1,806 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
"""myapp.py

This is a simple example of how to use the stamping feature.
It uses a custom stamping visitor to stamp a workflow with a unique
monitoring id stamp (per task), and a different visitor to stamp the last
task in the workflow. The last task is stamped with a consistent stamp, which
is used to revoke the task by its stamped header using two different approaches:
1. Run the workflow, then revoke the last task by its stamped header.
2. Revoke the last task by its stamped header before running the workflow.

Usage::

   # The worker service reacts to messages by executing tasks.
   (window1)$ celery -A myapp worker -l INFO

   # The shell service is used to run the example.
    (window2)$ celery -A myapp shell

   # Use (copy) the content of the examples modules to run the workflow via the
   # shell service.

   # Use one of demo runs via the shell service:
   # 1) run_then_revoke(): Run the workflow and revoke the last task
   #    by its stamped header during its run.
   # 2) revoke_then_run(): Revoke the last task by its stamped header
   #    before its run, then run the workflow.
   # 3) Any of the examples in examples.py
   #
   # See worker logs for output per defined in task_received_handler().
"""
import json

# Import tasks in worker context
import tasks  # noqa
from config import app

from celery.signals import task_received


@task_received.connect
def task_received_handler(sender=None, request=None, signal=None, **kwargs):
    print(f"In {signal.name} for: {repr(request)}")
    if hasattr(request, "stamped_headers") and request.stamped_headers:
        print(f"Found stamps: {request.stamped_headers}")
        print(json.dumps(request.stamps, indent=4, sort_keys=True))
    else:
        print("No stamps found")


if __name__ == "__main__":
    app.start()