File: backends.py

package info (click to toggle)
python-django-health-check 3.20.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 428 kB
  • sloc: python: 1,886; makefile: 6
file content (32 lines) | stat: -rw-r--r-- 1,071 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from django.db import DatabaseError, IntegrityError, transaction

from health_check.backends import BaseHealthCheckBackend
from health_check.exceptions import ServiceReturnedUnexpectedResult, ServiceUnavailable

from .models import TestModel


class DatabaseBackend(BaseHealthCheckBackend):
    database_name = ""

    def __init__(self, database_name: str = "") -> None:
        super().__init__()
        self.database_name = database_name or None

    def check_status(self):
        try:
            with transaction.atomic(using=self.database_name):
                obj = TestModel.objects.create(title="test")
                obj.title = "newtest"
                obj.save()
                obj.delete()
        except IntegrityError:
            raise ServiceReturnedUnexpectedResult("Integrity Error")
        except DatabaseError:
            raise ServiceUnavailable("Database error")

    def identifier(self) -> str:
        if not self.database_name:
            return super().identifier()

        return f"{self.__class__.__name__}[{self.database_name}]"