File: test_worker.py

package info (click to toggle)
celery 5.6.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,336 kB
  • sloc: python: 67,264; sh: 795; makefile: 378
file content (66 lines) | stat: -rw-r--r-- 2,369 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import subprocess

import pytest

from celery import Celery


def test_run_worker():
    with pytest.raises(subprocess.CalledProcessError) as exc_info:
        subprocess.check_output(
            ["celery", "--config", "t.integration.test_worker_config", "worker"],
            stderr=subprocess.STDOUT)

    called_process_error = exc_info.value
    assert called_process_error.returncode == 1, called_process_error
    output = called_process_error.output.decode('utf-8')
    assert output.find(
        "Retrying to establish a connection to the message broker after a connection "
        "loss has been disabled (app.conf.broker_connection_retry_on_startup=False). "
        "Shutting down...") != -1, output


def test_django_fixup_direct_worker(caplog, monkeypatch):
    """Test Django fixup by directly instantiating Celery worker without subprocess."""
    import logging

    import django

    # Set logging level to capture debug messages
    caplog.set_level(logging.DEBUG)

    # Configure Django settings
    monkeypatch.setenv('DJANGO_SETTINGS_MODULE', 't.integration.test_django_settings')
    django.setup()

    # Create Celery app with Django integration
    app = Celery('test_django_direct')
    app.config_from_object('django.conf:settings', namespace='CELERY')
    app.autodiscover_tasks()

    # Test that we can access worker configuration without recursion errors
    # This should trigger the Django fixup initialization
    worker = app.Worker(
        pool='solo',
        concurrency=1,
        loglevel='debug'
    )

    # Accessing pool_cls should not cause AttributeError
    pool_cls = worker.pool_cls
    assert pool_cls is not None

    # Verify pool_cls has __module__ attribute (should be a class, not a string)
    assert hasattr(pool_cls, '__module__'), \
        f"pool_cls should be a class with __module__, got {type(pool_cls)}: {pool_cls}"

    # Capture and check logs
    log_output = caplog.text

    # Verify no recursion-related errors in logs
    assert "RecursionError" not in log_output, f"RecursionError found in logs:\n{log_output}"
    assert "maximum recursion depth exceeded" not in log_output, \
        f"Recursion depth error found in logs:\n{log_output}"

    assert "AttributeError: 'str' object has no attribute '__module__'." not in log_output, \
        f"AttributeError found in logs:\n{log_output}"