File: worker_restart.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (42 lines) | stat: -rw-r--r-- 1,369 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from __future__ import annotations

from enum import Enum, auto

from pytest_celery import CeleryTestWorker


class WorkerRestart:
    """Restarts a worker in different ways."""
    class Method(Enum):
        POOL_RESTART = auto()
        DOCKER_RESTART_GRACEFULLY = auto()
        DOCKER_RESTART_FORCE = auto()

    def restart_worker(
        self,
        worker: CeleryTestWorker,
        method: WorkerRestart.Method,
        assertion: bool = True,
    ) -> None:
        """Restart a Celery worker.

        Args:
            worker (CeleryTestWorker): Worker to restart.
            method (WorkerRestart.Method): The method to restart the worker.
            assertion (bool, optional): Whether to assert the worker state after restart. Defaults to True.
        """
        if method == WorkerRestart.Method.POOL_RESTART:
            worker.app.control.pool_restart()
            worker.container.reload()

        if method == WorkerRestart.Method.DOCKER_RESTART_GRACEFULLY:
            worker.restart()

        if method == WorkerRestart.Method.DOCKER_RESTART_FORCE:
            worker.restart(force=True)

        if assertion:
            assert worker.container.status == "running", (
                f"Worker container should be in 'running' state after restart, "
                f"but is in '{worker.container.status}' state instead."
            )