File: tasks.py

package info (click to toggle)
celery 5.6.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,336 kB
  • sloc: python: 67,264; sh: 795; makefile: 378
file content (104 lines) | stat: -rw-r--r-- 3,311 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from time import sleep

from config import app
from visitors import FullVisitor, MonitoringIdStampingVisitor, MyStampingVisitor

from celery import Task
from celery.canvas import Signature, maybe_signature
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


def log_demo(running_task):
    request, name = running_task.request, running_task.name + running_task.request.argsrepr
    if hasattr(request, "stamps"):
        stamps = request.stamps or {}
        stamped_headers = request.stamped_headers or []

    if stamps and stamped_headers:
        logger.critical(f"Found {name}.stamps: {stamps}")
        logger.critical(f"Found {name}.stamped_headers: {stamped_headers}")
    else:
        logger.critical(f"Running {name} without stamps")

    links = request.callbacks or []
    for link in links:
        link = maybe_signature(link)
        logger.critical(f"Found {name}.link: {link}")
        stamped_headers = link.options.get("stamped_headers", [])
        stamps = {stamp: link.options[stamp] for stamp in stamped_headers}

        if stamps and stamped_headers:
            logger.critical(f"Found {name}.link stamps: {stamps}")
            logger.critical(f"Found {name}.link stamped_headers: {stamped_headers}")
        else:
            logger.critical(f"Running {name}.link without stamps")


class StampOnReplace(Task):
    """Custom task for stamping on replace"""

    def on_replace(self, sig: Signature):
        logger.warning(f"StampOnReplace: {sig}.stamp(FullVisitor())")
        sig.stamp(FullVisitor())
        logger.warning(f"StampOnReplace: {sig}.stamp(MyStampingVisitor())")
        sig.stamp(MyStampingVisitor())
        return super().on_replace(sig)


class MonitoredTask(Task):
    def on_replace(self, sig: Signature):
        logger.warning(f"MonitoredTask: {sig}.stamp(MonitoringIdStampingVisitor())")
        sig.stamp(MonitoringIdStampingVisitor(), append_stamps=False)
        return super().on_replace(sig)


@app.task(bind=True)
def identity_task(self, x):
    """Identity function"""
    log_demo(self)
    return x


@app.task(bind=True, base=MonitoredTask)
def replaced_identity(self: MonitoredTask, x):
    log_demo(self)
    logger.warning("Stamping identity_task with MonitoringIdStampingVisitor() before replace")
    replaced_task = identity_task.s(x)
    # These stamps should be overridden by the stamps from MonitoredTask.on_replace()
    replaced_task.stamp(MonitoringIdStampingVisitor())
    return self.replace(replaced_task)


@app.task(bind=True, base=StampOnReplace)
def identity(self: Task, x):
    log_demo(self)
    return self.replace(replaced_identity.s(x))


@app.task
def mul(x: int, y: int) -> int:
    """Multiply two numbers"""
    return x * y


@app.task
def xsum(numbers: list) -> int:
    """Sum a list of numbers"""
    return sum(numbers)


@app.task
def waitfor(seconds: int) -> None:
    """Wait for "seconds" seconds, ticking every second."""
    print(f"Waiting for {seconds} seconds...")
    for i in range(seconds):
        sleep(1)
        print(f"{i+1} seconds passed")


@app.task(bind=True, base=StampOnReplace)
def wait_for_revoke(self: StampOnReplace, seconds: int) -> None:
    """Replace this task with a new task that waits for "seconds" seconds."""
    self.replace(waitfor.s(seconds))