File: revoke_example.py

package info (click to toggle)
celery 5.6.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,336 kB
  • sloc: python: 67,264; sh: 795; makefile: 378
file content (75 lines) | stat: -rw-r--r-- 2,548 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from time import sleep

from tasks import identity_task, mul, wait_for_revoke, xsum
from visitors import MonitoringIdStampingVisitor

from celery.canvas import Signature, chain, chord, group
from celery.result import AsyncResult


def create_canvas(n: int) -> Signature:
    """Creates a canvas to calculate: n * sum(1..n) * 10
    For example, if n = 3, the result is 3 * (1 + 2 + 3) * 10 = 180
    """
    canvas = chain(
        group(identity_task.s(i) for i in range(1, n+1)) | xsum.s(),
        chord(group(mul.s(10) for _ in range(1, n+1)), xsum.s()),
    )

    return canvas


def revoke_by_headers(result: AsyncResult, terminate: bool) -> None:
    """Revokes the last task in the workflow by its stamped header

    Arguments:
        result (AsyncResult): Can be either a frozen or a running result
        terminate (bool): If True, the revoked task will be terminated
    """
    result.revoke_by_stamped_headers({'mystamp': 'I am a stamp!'}, terminate=terminate)


def prepare_workflow() -> Signature:
    """Creates a canvas that waits "n * sum(1..n) * 10" in seconds,
    with n = 3.

    The canvas itself is stamped with a unique monitoring id stamp per task.
    The waiting task is stamped with different consistent stamp, which is used
    to revoke the task by its stamped header.
    """
    canvas = create_canvas(n=3)
    canvas = canvas | wait_for_revoke.s()
    canvas.stamp(MonitoringIdStampingVisitor())
    return canvas


def run_then_revoke():
    """Runs the workflow and lets the waiting task run for a while.
    Then, the waiting task is revoked by its stamped header.

    The expected outcome is that the canvas will be calculated to the end,
    but the waiting task will be revoked and terminated *during its run*.

    See worker logs for more details.
    """
    canvas = prepare_workflow()
    result = canvas.delay()
    print('Wait 5 seconds, then revoke the last task by its stamped header: "mystamp": "I am a stamp!"')
    sleep(5)
    print('Revoking the last task...')
    revoke_by_headers(result, terminate=True)


def revoke_then_run():
    """Revokes the waiting task by its stamped header before it runs.
    Then, run the workflow, which will not run the waiting task that was revoked.

    The expected outcome is that the canvas will be calculated to the end,
    but the waiting task will not run at all.

    See worker logs for more details.
    """
    canvas = prepare_workflow()
    result = canvas.freeze()
    revoke_by_headers(result, terminate=False)
    result = canvas.delay()