1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
from unittest.mock import Mock, patch
import pytest
from celery import bootsteps
from celery.worker.loops import synloop
def test_synloop_perform_pending_operations_on_system_exit():
# Mock dependencies
obj = Mock()
connection = Mock()
consumer = Mock()
blueprint = Mock()
hub = Mock()
qos = Mock()
heartbeat = Mock()
clock = Mock()
# Set up the necessary attributes
obj.create_task_handler.return_value = Mock()
obj.perform_pending_operations = Mock()
obj.on_ready = Mock()
obj.pool.is_green = False
obj.connection = True
blueprint.state = bootsteps.RUN # Simulate RUN state
qos.prev = qos.value = Mock()
# Mock state.maybe_shutdown to raise SystemExit
with patch("celery.worker.loops.state") as mock_state:
mock_state.maybe_shutdown.side_effect = SystemExit
# Call synloop and expect SystemExit to be raised
with pytest.raises(SystemExit):
synloop(
obj,
connection,
consumer,
blueprint,
hub,
qos,
heartbeat,
clock,
hbrate=2.0,
)
# Assert that perform_pending_operations was called even after SystemExit
obj.perform_pending_operations.assert_called_once()
# Assert that connection.drain_events was called
connection.drain_events.assert_called_with(timeout=2.0)
# Assert other important method calls
obj.on_ready.assert_called_once()
consumer.consume.assert_called_once()
|