File: worker_kill.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (47 lines) | stat: -rw-r--r-- 1,346 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from __future__ import annotations

from enum import Enum, auto

from pytest_celery import CeleryTestWorker

from celery.app.control import Control


class WorkerKill:
    """Kills a worker in different ways."""

    class Method(Enum):
        DOCKER_KILL = auto()
        CONTROL_SHUTDOWN = auto()
        SIGTERM = auto()
        SIGQUIT = auto()

    def kill_worker(
        self,
        worker: CeleryTestWorker,
        method: WorkerKill.Method,
    ) -> None:
        """Kill a Celery worker.

        Args:
            worker (CeleryTestWorker): Worker to kill.
            method (WorkerKill.Method): The method to kill the worker.
        """
        if method == WorkerKill.Method.DOCKER_KILL:
            worker.kill()

            assert worker.container.status == "exited", (
                f"Worker container should be in 'exited' state after kill, "
                f"but is in '{worker.container.status}' state instead."
            )

        if method == WorkerKill.Method.CONTROL_SHUTDOWN:
            control: Control = worker.app.control
            control.shutdown(destination=[worker.hostname()])
            worker.container.reload()

        if method == WorkerKill.Method.SIGTERM:
            worker.kill(signal="SIGTERM")

        if method == WorkerKill.Method.SIGQUIT:
            worker.kill(signal="SIGQUIT")