1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
from django.db import DatabaseError, IntegrityError, transaction
from health_check.backends import BaseHealthCheckBackend
from health_check.exceptions import ServiceReturnedUnexpectedResult, ServiceUnavailable
from .models import TestModel
class DatabaseBackend(BaseHealthCheckBackend):
database_name = ""
def __init__(self, database_name: str = "") -> None:
super().__init__()
self.database_name = database_name or None
def check_status(self):
try:
with transaction.atomic(using=self.database_name):
obj = TestModel.objects.create(title="test")
obj.title = "newtest"
obj.save()
obj.delete()
except IntegrityError:
raise ServiceReturnedUnexpectedResult("Integrity Error")
except DatabaseError:
raise ServiceUnavailable("Database error")
def identifier(self) -> str:
if not self.database_name:
return super().identifier()
return f"{self.__class__.__name__}[{self.database_name}]"
|