File: tasks.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (81 lines) | stat: -rw-r--r-- 1,888 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""Smoke tests tasks."""

from __future__ import annotations

import os
import sys
from signal import SIGKILL
from time import sleep

import celery.utils
from celery import Task, shared_task, signature
from celery.canvas import Signature
from t.integration.tasks import *  # noqa
from t.integration.tasks import replaced_with_me


@shared_task
def noop(*args, **kwargs) -> None:
    return celery.utils.noop(*args, **kwargs)


@shared_task
def long_running_task(seconds: float = 1, verbose: bool = False) -> bool:
    from celery import current_task
    from celery.utils.log import get_task_logger

    logger = get_task_logger(current_task.name)

    logger.info("Starting long running task")

    for i in range(0, int(seconds)):
        sleep(1)
        if verbose:
            logger.info(f"Sleeping: {i}")

    logger.info("Finished long running task")

    return True


@shared_task(soft_time_limit=3, time_limit=5)
def soft_time_limit_lower_than_time_limit():
    sleep(4)


@shared_task(soft_time_limit=5, time_limit=3)
def soft_time_limit_must_exceed_time_limit():
    pass


@shared_task(bind=True)
def replace_with_task(self: Task, replace_with: Signature = None):
    if replace_with is None:
        replace_with = replaced_with_me.s()
    return self.replace(signature(replace_with))


@shared_task
def self_termination_sigkill():
    """Forceful termination."""
    os.kill(os.getpid(), SIGKILL)


@shared_task
def self_termination_system_exit():
    """Triggers a system exit to simulate a critical stop of the Celery worker."""
    sys.exit(1)


@shared_task(time_limit=2)
def self_termination_delay_timeout():
    """Delays the execution to simulate a task timeout."""
    sleep(4)


@shared_task
def self_termination_exhaust_memory():
    """Continuously allocates memory to simulate memory exhaustion."""
    mem = []
    while True:
        mem.append(" " * 10**6)