File: backends.py

package info (click to toggle)
python-django-health-check 3.20.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 428 kB
  • sloc: python: 1,886; makefile: 6
file content (27 lines) | stat: -rw-r--r-- 1,060 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import logging

from django.conf import settings
from django.db import DEFAULT_DB_ALIAS, DatabaseError, connections
from django.db.migrations.executor import MigrationExecutor

from health_check.backends import BaseHealthCheckBackend
from health_check.exceptions import ServiceUnavailable

logger = logging.getLogger(__name__)


class MigrationsHealthCheck(BaseHealthCheckBackend):
    def get_migration_plan(self, executor):
        return executor.migration_plan(executor.loader.graph.leaf_nodes())

    def check_status(self):
        db_alias = getattr(settings, "HEALTHCHECK_MIGRATIONS_DB", DEFAULT_DB_ALIAS)
        try:
            executor = MigrationExecutor(connections[db_alias])
            plan = self.get_migration_plan(executor)
            if plan:
                self.add_error(ServiceUnavailable("There are migrations to apply"))
        except DatabaseError as e:
            self.add_error(ServiceUnavailable("Database is not ready"), e)
        except Exception as e:
            self.add_error(ServiceUnavailable("Unexpected error"), e)