File: backends.py

package info (click to toggle)
python-django-health-check 3.20.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 428 kB
  • sloc: python: 1,886; makefile: 6
file content (20 lines) | stat: -rw-r--r-- 745 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from django.db import connection

from health_check.backends import BaseHealthCheckBackend
from health_check.exceptions import ServiceUnavailable


class DatabaseHeartBeatCheck(BaseHealthCheckBackend):
    """Health check that runs a simple SELECT 1; query to test if the database connection is alive."""

    def check_status(self):
        try:
            result = None
            with connection.cursor() as cursor:
                cursor.execute("SELECT 1;")
                result = cursor.fetchone()

            if result != (1,):
                raise ServiceUnavailable("Health Check query did not return the expected result.")
        except Exception as e:
            raise ServiceUnavailable(f"Database health check failed: {e}")