File: task_termination.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (48 lines) | stat: -rw-r--r-- 1,855 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from __future__ import annotations

from enum import Enum, auto

from pytest_celery import CeleryTestWorker

from celery.canvas import Signature
from celery.result import AsyncResult
from t.smoke.tasks import (self_termination_delay_timeout, self_termination_exhaust_memory, self_termination_sigkill,
                           self_termination_system_exit)


class TaskTermination:
    """Terminates a task in different ways."""
    class Method(Enum):
        SIGKILL = auto()
        SYSTEM_EXIT = auto()
        DELAY_TIMEOUT = auto()
        EXHAUST_MEMORY = auto()

    def apply_self_termination_task(
        self,
        worker: CeleryTestWorker,
        method: TaskTermination.Method,
    ) -> AsyncResult:
        """Apply a task that will terminate itself.

        Args:
            worker (CeleryTestWorker): Take the queue of this worker.
            method (TaskTermination.Method): The method to terminate the task.

        Returns:
            AsyncResult: The result of applying the task.
        """
        try:
            self_termination_sig: Signature = {
                TaskTermination.Method.SIGKILL: self_termination_sigkill.si(),
                TaskTermination.Method.SYSTEM_EXIT: self_termination_system_exit.si(),
                TaskTermination.Method.DELAY_TIMEOUT: self_termination_delay_timeout.si(),
                TaskTermination.Method.EXHAUST_MEMORY: self_termination_exhaust_memory.si(),
            }[method]

            return self_termination_sig.apply_async(queue=worker.worker_queue)
        finally:
            # If there's an unexpected bug and the termination of the task caused the worker
            # to crash, this will refresh the container object with the updated container status
            # which can be asserted/checked during a test (for dev/debug)
            worker.container.reload()