File: test_autodiscover.py

package info (click to toggle)
python-django-health-check 3.20.0-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 428 kB
  • sloc: python: 1,886; makefile: 6
file content (29 lines) | stat: -rw-r--r-- 1,152 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from celery import current_app
from django.conf import settings

from health_check.contrib.celery.backends import CeleryHealthCheck
from health_check.contrib.celery_ping.backends import CeleryPingHealthCheck
from health_check.plugins import plugin_dir


class TestAutoDiscover:
    def test_autodiscover(self):
        health_check_plugins = list(
            filter(
                lambda x: x.startswith("health_check.") and "celery" not in x,
                settings.INSTALLED_APPS,
            )
        )

        non_celery_plugins = [
            x for x in plugin_dir._registry if not issubclass(x[0], (CeleryHealthCheck, CeleryPingHealthCheck))
        ]

        # The number of installed apps excluding celery should equal to all plugins except celery
        assert len(non_celery_plugins) == len(health_check_plugins) + len(
            settings.DATABASES  # Each database creates specific health_check attached to it
        )

    def test_discover_celery_queues(self):
        celery_plugins = [x for x in plugin_dir._registry if issubclass(x[0], CeleryHealthCheck)]
        assert len(celery_plugins) == len(current_app.amqp.queues)