File: test_prefork_shutdown.py

package info (click to toggle)
celery 5.6.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,376 kB
  • sloc: python: 67,264; sh: 795; makefile: 378
file content (89 lines) | stat: -rw-r--r-- 2,912 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""Integration tests for prefork pool shutdown behaviour.

These tests verify that the prefork pool gracefully shuts down and maintains
heartbeats during the shutdown process, preventing connection loss during
worker drain.
"""

from time import sleep

import pytest

from celery.contrib.testing.worker import start_worker
from celery.worker import state

from .tasks import sleeping

TEST_HEARTBEAT = 2

# Exceeds AMQP connection timeout (~4 seconds: broker closes after missing
# 2 consecutive 2-second heartbeats)
LONG_TASK_DURATION = 10

TIMEOUT = LONG_TASK_DURATION * 2


@pytest.fixture
def heartbeat_worker(celery_session_app):
    """Worker with short heartbeat for testing purposes."""

    # Temporarily lower heartbeat for this test
    original_heartbeat = celery_session_app.conf.broker_heartbeat
    celery_session_app.conf.broker_heartbeat = TEST_HEARTBEAT

    original_acks_late = celery_session_app.conf.task_acks_late
    celery_session_app.conf.task_acks_late = True

    with start_worker(
        celery_session_app,
        pool="prefork",
        without_heartbeat=False,
        concurrency=4,
        shutdown_timeout=TIMEOUT,
        perform_ping_check=False,
    ) as worker:
        # Verify that low heartbeat is configured correctly
        assert worker.consumer.amqheartbeat == TEST_HEARTBEAT

        yield worker

    celery_session_app.conf.broker_heartbeat = original_heartbeat
    celery_session_app.conf.task_acks_late = original_acks_late


class test_prefork_shutdown:
    """Test prefork shutdown with heartbeat maintenance."""

    # Test timeout should be longer than worker timeout
    @pytest.mark.timeout(timeout=TIMEOUT * 2)
    @pytest.mark.usefixtures("heartbeat_worker")
    def test_shutdown_with_long_running_tasks(self):
        """Test that graceful shutdown completes long-running tasks without
        connection loss.

        This test verifies that when the prefork pool is shutting down with
        long-running tasks, heartbeats continue to be sent to maintain the
        broker connection.

        - Heartbeat frames sent every 2 seconds
        - Connection closes after 4 seconds (two missed frames) without heartbeats
        - Tasks run 10 seconds to exceed 4-second threshold
        """

        # Submit multiple long-running tasks that will be active during shutdown
        num_tasks = 3
        results = []
        for _ in range(num_tasks):
            results.append(sleeping.delay(LONG_TASK_DURATION))

        # Give time for tasks to start executing
        sleep(1)

        state.should_stop = True

        # Wait for all tasks to complete. If heartbeats aren't maintained during
        # shutdown, this will fail with `ConnectionResetError`, `BrokenPipeError`
        # and `celery.exceptions.TimeoutError`.
        for result in results:
            result.get(timeout=TIMEOUT)
            assert result.status == "SUCCESS"