File: test_loops.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (57 lines) | stat: -rw-r--r-- 1,581 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from unittest.mock import Mock, patch

import pytest

from celery import bootsteps
from celery.worker.loops import synloop


def test_synloop_perform_pending_operations_on_system_exit():
    # Mock dependencies
    obj = Mock()
    connection = Mock()
    consumer = Mock()
    blueprint = Mock()
    hub = Mock()
    qos = Mock()
    heartbeat = Mock()
    clock = Mock()

    # Set up the necessary attributes
    obj.create_task_handler.return_value = Mock()
    obj.perform_pending_operations = Mock()
    obj.on_ready = Mock()
    obj.pool.is_green = False
    obj.connection = True

    blueprint.state = bootsteps.RUN  # Simulate RUN state

    qos.prev = qos.value = Mock()

    # Mock state.maybe_shutdown to raise SystemExit
    with patch("celery.worker.loops.state") as mock_state:
        mock_state.maybe_shutdown.side_effect = SystemExit

        # Call synloop and expect SystemExit to be raised
        with pytest.raises(SystemExit):
            synloop(
                obj,
                connection,
                consumer,
                blueprint,
                hub,
                qos,
                heartbeat,
                clock,
                hbrate=2.0,
            )

    # Assert that perform_pending_operations was called even after SystemExit
    obj.perform_pending_operations.assert_called_once()

    # Assert that connection.drain_events was called
    connection.drain_events.assert_called_with(timeout=2.0)

    # Assert other important method calls
    obj.on_ready.assert_called_once()
    consumer.consume.assert_called_once()