File: test_celery_signals.py

package info (click to toggle)
python-django-guid 3.5.2-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 664 kB
  • sloc: python: 1,267; makefile: 16
file content (192 lines) | stat: -rw-r--r-- 8,075 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import logging
from copy import deepcopy

from django.conf import settings as django_settings
from django.test import override_settings

from pytest_mock import MockerFixture

from django_guid import get_guid, set_guid
from django_guid.config import Settings
from django_guid.integrations import CeleryIntegration
from django_guid.integrations.celery.context import celery_current, celery_parent
from django_guid.integrations.celery.signals import (
    clean_up,
    parent_header,
    publish_task_from_worker_or_request,
    set_transaction_id,
    worker_prerun,
)
from django_guid.utils import generate_guid


def test_task_publish_includes_correct_headers(monkeypatch):
    """
    It's important that we include the correct headers when publishing a task
    to the celery worker pool, otherwise there's no transfer of state.
    """
    # Mocking overhead
    mocked_settings = deepcopy(django_settings.DJANGO_GUID)
    mocked_settings['INTEGRATIONS'] = [CeleryIntegration(log_parent=False)]
    with override_settings(DJANGO_GUID=mocked_settings):
        settings = Settings()
        monkeypatch.setattr('django_guid.integrations.celery.signals.settings', settings)

        # Actual testing
        for correlation_id in [None, 'test', 123, -1]:
            # Set the id in our context var
            set_guid(correlation_id)

            # Run signal with empty headers
            headers = {}
            publish_task_from_worker_or_request(headers=headers)

            # Makes sure the returned headers contain the correct result
            assert headers[settings.guid_header_name] == correlation_id


def test_task_publish_includes_correct_depth_headers(monkeypatch):
    """
    Test log_parent True
    """
    mocked_settings = deepcopy(django_settings.DJANGO_GUID)
    mocked_settings['INTEGRATIONS'] = [CeleryIntegration(log_parent=True)]
    with override_settings(DJANGO_GUID=mocked_settings):
        settings = Settings()
        monkeypatch.setattr('django_guid.integrations.celery.signals.settings', settings)

        headers = {}
        publish_task_from_worker_or_request(headers=headers)
        # The parent header should not be in headers, because
        # There should be no celery_current context
        assert parent_header not in headers

        for correlation_id in ['test', 123, -1]:
            headers = {}
            celery_current.set(correlation_id)
            publish_task_from_worker_or_request(headers=headers)
            # Here the celery-parent-id header should exist
            assert headers[parent_header] == correlation_id


def test_worker_prerun_guid_exists(monkeypatch, mocker: MockerFixture, two_unique_uuid4):
    """
    Tests that GUID is set to the GUID if a GUID exists in the task object.
    """
    mock_task = mocker.Mock()
    mock_task.request = {'Correlation-ID': '704ae5472cae4f8daa8f2cc5a5a8mock'}
    mocked_settings = deepcopy(django_settings.DJANGO_GUID)
    mocked_settings['INTEGRATIONS'] = [CeleryIntegration(log_parent=False)]
    with override_settings(DJANGO_GUID=mocked_settings):
        settings = Settings()
        monkeypatch.setattr('django_guid.integrations.celery.signals.settings', settings)
        worker_prerun(mock_task)
    assert get_guid() == '704ae5472cae4f8daa8f2cc5a5a8mock'


def test_worker_prerun_guid_does_not_exist(monkeypatch, mocker: MockerFixture, mock_uuid):
    """
    Tests that a GUID is set if it does not exist
    """
    mock_task = mocker.Mock()
    mock_task.request = {'Correlation-ID': None}
    mocked_settings = deepcopy(django_settings.DJANGO_GUID)
    mocked_settings['INTEGRATIONS'] = [CeleryIntegration(log_parent=False)]
    with override_settings(DJANGO_GUID=mocked_settings):
        settings = Settings()
        monkeypatch.setattr('django_guid.integrations.celery.signals.settings', settings)
        worker_prerun(mock_task)
    assert get_guid() == '704ae5472cae4f8daa8f2cc5a5a8mock'


def test_worker_prerun_guid_log_parent_no_origin(monkeypatch, mocker: MockerFixture, mock_uuid_two_unique):
    """
    Tests that depth works when there is no origin
    """
    from django_guid.integrations.celery.signals import parent_header

    mock_task = mocker.Mock()
    mock_task.request = {'Correlation-ID': None, parent_header: None}  # No origin
    mocked_settings = deepcopy(django_settings.DJANGO_GUID)
    mocked_settings['INTEGRATIONS'] = [CeleryIntegration(log_parent=True)]
    with override_settings(DJANGO_GUID=mocked_settings):
        settings = Settings()
        monkeypatch.setattr('django_guid.integrations.celery.signals.settings', settings)
        worker_prerun(mock_task)
    assert get_guid() == '704ae5472cae4f8daa8f2cc5a5a8mock'
    assert celery_current.get() == 'c494886651cd4baaa8654e4d24a8mock'
    assert celery_parent.get() is None


def test_worker_prerun_guid_log_parent_with_origin(monkeypatch, mocker: MockerFixture, mock_uuid_two_unique):
    """
    Tests that depth works when there is an origin
    """
    from django_guid.integrations.celery.signals import parent_header

    mock_task = mocker.Mock()
    mock_task.request = {'Correlation-ID': None, parent_header: '1234'}  # No origin
    mocked_settings = deepcopy(django_settings.DJANGO_GUID)
    mocked_settings['INTEGRATIONS'] = [CeleryIntegration(log_parent=True)]
    with override_settings(DJANGO_GUID=mocked_settings):
        settings = Settings()
        monkeypatch.setattr('django_guid.integrations.celery.signals.settings', settings)
        worker_prerun(mock_task)
    assert get_guid() == '704ae5472cae4f8daa8f2cc5a5a8mock'
    assert celery_current.get() == 'c494886651cd4baaa8654e4d24a8mock'
    assert celery_parent.get() == '1234'


def test_cleanup(monkeypatch, mocker: MockerFixture):
    """
    Test that cleanup works as expected
    """
    set_guid('123')
    celery_current.set('123')
    celery_parent.set('123')

    mocked_settings = deepcopy(django_settings.DJANGO_GUID)
    mocked_settings['INTEGRATIONS'] = [CeleryIntegration(log_parent=True)]
    with override_settings(DJANGO_GUID=mocked_settings):
        settings = Settings()
        monkeypatch.setattr('django_guid.integrations.celery.signals.settings', settings)
        clean_up(task=mocker.Mock())

    assert [get_guid(), celery_current.get(), celery_parent.get()] == [None, None, None]


def test_set_transaction_id(monkeypatch, caplog):
    """
    Tests that the `configure_scope()` is executed, given `sentry_integration=True` in CeleryIntegration
    """
    # https://github.com/eisensheng/pytest-catchlog/issues/44
    logger = logging.getLogger('django_guid.celery')  # Ensure caplog can catch logs with `propagate=False`
    logger.addHandler(caplog.handler)

    mocked_settings = deepcopy(django_settings.DJANGO_GUID)
    mocked_settings['INTEGRATIONS'] = [CeleryIntegration(sentry_integration=True)]
    with override_settings(DJANGO_GUID=mocked_settings):
        settings = Settings()
        monkeypatch.setattr('django_guid.integrations.celery.signals.settings', settings)
        guid = generate_guid()
        set_transaction_id(guid)
    logger.removeHandler(caplog.handler)  # Remove handler before test finish
    assert f'Setting Sentry transaction_id to {guid}' in [record.message for record in caplog.records]


def test_dont_set_transaction_id(monkeypatch, caplog):
    """
    Tests that the `configure_scope()` is not executed, given `sentry_integration=False` in CeleryIntegration
    """
    logger = logging.getLogger('django_guid.celery')
    logger.addHandler(caplog.handler)

    mocked_settings = deepcopy(django_settings.DJANGO_GUID)
    mocked_settings['INTEGRATIONS'] = [CeleryIntegration(sentry_integration=False)]
    with override_settings(DJANGO_GUID=mocked_settings):
        settings = Settings()
        monkeypatch.setattr('django_guid.integrations.celery.signals.settings', settings)
        guid = generate_guid()
        set_transaction_id(guid)
    logger.removeHandler(caplog.handler)
    assert f'Setting Sentry transaction_id to {guid}' not in [record.message for record in caplog.records]