File: test_prometheus_metrics.py

package info (click to toggle)
django-rq 3.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 648 kB
  • sloc: python: 3,184; makefile: 7
file content (135 lines) | stat: -rw-r--r-- 5,398 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
from unittest import skipIf
from unittest.mock import patch

from django.contrib.auth.models import User
from django.test import TestCase, override_settings
from django.test.client import Client
from django.urls import NoReverseMatch, reverse

from django_rq import get_queue
from django_rq.workers import get_worker

from .fixtures import access_self, failing_job

try:
    import prometheus_client
except ImportError:
    prometheus_client = None

RQ_QUEUES = {
    'default': {
        'HOST': os.environ.get('REDIS_HOST', 'localhost'),
        'PORT': 6379,
        'DB': 0,
    },
}


@skipIf(prometheus_client is None, 'prometheus_client is required')
@override_settings(RQ={'AUTOCOMMIT': True})
class PrometheusTest(TestCase):
    def setUp(self):
        self.user = User.objects.create_user('foo', password='pass')
        self.user.is_staff = True
        self.user.is_active = True
        self.user.save()
        self.client = Client()
        self.client.force_login(self.user)
        get_queue('default').connection.flushall()

    def assertMetricsContain(self, lines):
        response = self.client.get(reverse('rq_metrics'))
        self.assertEqual(response.status_code, 200)
        self.assertLessEqual(
            lines, set(response.content.decode('utf-8').splitlines())
        )

    @patch('django_rq.settings.QUEUES', RQ_QUEUES)
    def test_metrics_default(self):
        self.assertMetricsContain(
            {
                '# HELP rq_jobs RQ jobs by status',
                'rq_jobs{queue="default",status="queued"} 0.0',
                'rq_jobs{queue="default",status="started"} 0.0',
                'rq_jobs{queue="default",status="finished"} 0.0',
                'rq_jobs{queue="default",status="failed"} 0.0',
                'rq_jobs{queue="default",status="deferred"} 0.0',
                'rq_jobs{queue="default",status="scheduled"} 0.0',
            }
        )

    @patch('django_rq.settings.QUEUES', RQ_QUEUES)
    def test_metrics_with_jobs(self):
        queue = get_queue('default')
        queue.enqueue(failing_job)

        for _ in range(10):
            queue.enqueue(access_self)

        worker = get_worker('default', name='test_worker')
        worker.register_birth()

        # override worker registration to effectively simulate non burst mode
        register_death = worker.register_death
        worker.register_birth = worker.register_death = lambda: None  # type: ignore[method-assign]

        try:
            self.assertMetricsContain(
                {
                    # job information
                    '# HELP rq_jobs RQ jobs by status',
                    'rq_jobs{queue="default",status="queued"} 11.0',
                    'rq_jobs{queue="default",status="started"} 0.0',
                    'rq_jobs{queue="default",status="finished"} 0.0',
                    'rq_jobs{queue="default",status="failed"} 0.0',
                    'rq_jobs{queue="default",status="deferred"} 0.0',
                    'rq_jobs{queue="default",status="scheduled"} 0.0',
                    # worker information
                    '# HELP rq_workers RQ workers',
                    'rq_workers{name="test_worker",queues="default",state="?"} 1.0',
                    '# HELP rq_job_successful_total RQ successful job count',
                    'rq_job_successful_total{name="test_worker",queues="default"} 0.0',
                    '# HELP rq_job_failed_total RQ failed job count',
                    'rq_job_failed_total{name="test_worker",queues="default"} 0.0',
                    '# HELP rq_working_seconds_total RQ total working time',
                    'rq_working_seconds_total{name="test_worker",queues="default"} 0.0',
                }
            )

            worker.work(burst=True, max_jobs=4)
            self.assertMetricsContain(
                {
                    # job information
                    'rq_jobs{queue="default",status="queued"} 7.0',
                    'rq_jobs{queue="default",status="finished"} 3.0',
                    'rq_jobs{queue="default",status="failed"} 1.0',
                    # worker information
                    'rq_workers{name="test_worker",queues="default",state="idle"} 1.0',
                    'rq_job_successful_total{name="test_worker",queues="default"} 3.0',
                    'rq_job_failed_total{name="test_worker",queues="default"} 1.0',
                }
            )

            worker.work(burst=True)
            self.assertMetricsContain(
                {
                    # job information
                    'rq_jobs{queue="default",status="queued"} 0.0',
                    'rq_jobs{queue="default",status="finished"} 10.0',
                    'rq_jobs{queue="default",status="failed"} 1.0',
                    # worker information
                    'rq_workers{name="test_worker",queues="default",state="idle"} 1.0',
                    'rq_job_successful_total{name="test_worker",queues="default"} 10.0',
                    'rq_job_failed_total{name="test_worker",queues="default"} 1.0',
                }
            )
        finally:
            register_death()


@skipIf(prometheus_client is not None, 'prometheus_client is installed')
class NoPrometheusTest(TestCase):
    def test_no_metrics_without_prometheus_client(self):
        with self.assertRaises(NoReverseMatch):
            reverse('rq_metrics')