File: test_celery.py

package info (click to toggle)
python-django-structlog 9.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,004 kB
  • sloc: python: 3,509; sh: 206; javascript: 79; makefile: 19
file content (77 lines) | stat: -rw-r--r-- 2,295 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import pytest

from .. import celery

pytestmark = pytest.mark.django_db


class TestSuccessfulTask:
    def test(self, caplog):
        celery.successful_task(foo="bar")
        assert len(caplog.records) == 1
        record = caplog.records[0]
        assert record.msg["event"] == "This is a successful task"


class TestFailingTask:
    def test(self):
        with pytest.raises(Exception) as e:
            celery.failing_task(foo="bar")
        assert str(e.value) == "This is a failed task"


class TestNestingTask:
    def test(self, caplog):
        celery.nesting_task()
        assert len(caplog.records) == 1
        record = caplog.records[0]
        assert record.msg["event"] == "This is a nesting task"


class TestNestedTask:
    def test(self, caplog):
        celery.nested_task()
        assert len(caplog.records) == 1
        record = caplog.records[0]
        assert record.msg["event"] == "This is a nested task"


class TestScheduledTask:
    def test(self, caplog):
        celery.scheduled_task()
        assert len(caplog.records) == 1
        record = caplog.records[0]
        assert record.msg["event"] == "This is a scheduled task"


class TestRejectedTask:
    def test(self):
        assert celery.rejected_task() is None


class TestCorruptRejectedTask:
    def test(self, caplog):
        task_id = "11111111-1111-1111-1111-111111111111"
        headers = dict(
            id=task_id,
            task="django_structlog_demo_project.taskapp.celery.rejected_task",
        )
        celery.corrupt_rejected_task(sender=None, headers=headers)
        assert len(caplog.records) == 1
        record = caplog.records[0]
        assert record.msg["event"] == "corrupting rejected_task"
        assert record.msg["task_id"] == task_id
        assert "task" not in headers

    def test_other_tasks_not_corrupted(self, caplog):
        task_id = "11111111-1111-1111-1111-111111111111"
        headers = dict(
            id=task_id,
            task="django_structlog_demo_project.taskapp.celery.successful_task",
        )
        celery.corrupt_rejected_task(sender=None, headers=headers)
        assert len(caplog.records) == 0
        assert (
            headers["task"]
            == "django_structlog_demo_project.taskapp.celery.successful_task"
        )