1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
import asyncio
from typing import TYPE_CHECKING, Set, Collection, Mapping, Dict, Any, Optional
from itertools import chain
from ..const import Status
from ..utils import _service_name
from ..server import Stream
from .v1.health_pb2 import HealthCheckRequest, HealthCheckResponse
from .v1.health_grpc import HealthBase
if TYPE_CHECKING:
from .check import CheckBase # noqa
from .._typing import ICheckable # noqa
def _status(
checks: Set['CheckBase'],
) -> 'HealthCheckResponse.ServingStatus.ValueType':
statuses = {check.__status__() for check in checks}
if statuses == {None}:
return HealthCheckResponse.UNKNOWN
elif statuses == {True}:
return HealthCheckResponse.SERVING
else:
return HealthCheckResponse.NOT_SERVING
def _reset_waits(
events: Collection[asyncio.Event],
waits: Mapping[asyncio.Event, 'asyncio.Task[bool]'],
) -> Dict[asyncio.Event, 'asyncio.Task[bool]']:
new_waits = {}
for event in events:
wait = waits.get(event)
if wait is None or wait.done():
event.clear()
wait = asyncio.ensure_future(event.wait())
new_waits[event] = wait
return new_waits
class _Overall:
# `_service_name` should return '' (empty string) for this service
def __mapping__(self) -> Dict[str, Any]:
return {'//': None}
#: Represents overall health status of all services
OVERALL = _Overall()
_ChecksConfig = Mapping['ICheckable', Collection['CheckBase']]
class Health(HealthBase):
"""Health-checking service
Example:
.. code-block:: python3
from grpclib.health.service import Health
auth = AuthService()
billing = BillingService()
health = Health({
auth: [redis_status],
billing: [db_check],
})
server = Server([auth, billing, health])
"""
def __init__(self, checks: Optional[_ChecksConfig] = None) -> None:
if checks is None:
checks = {OVERALL: []}
elif OVERALL not in checks:
checks = dict(checks)
checks[OVERALL] = list(chain.from_iterable(checks.values()))
self._checks = {_service_name(s): set(check_list)
for s, check_list in checks.items()}
async def Check(
self,
stream: Stream[HealthCheckRequest, HealthCheckResponse],
) -> None:
"""Implements synchronous periodic checks"""
request = await stream.recv_message()
assert request is not None
checks = self._checks.get(request.service)
if checks is None:
await stream.send_trailing_metadata(status=Status.NOT_FOUND)
elif len(checks) == 0:
await stream.send_message(HealthCheckResponse(
status=HealthCheckResponse.SERVING,
))
else:
for check in checks:
await check.__check__()
await stream.send_message(HealthCheckResponse(
status=_status(checks),
))
async def Watch(
self,
stream: Stream[HealthCheckRequest, HealthCheckResponse],
) -> None:
request = await stream.recv_message()
assert request is not None
checks = self._checks.get(request.service)
if checks is None:
await stream.send_message(HealthCheckResponse(
status=HealthCheckResponse.SERVICE_UNKNOWN,
))
while True:
await asyncio.sleep(3600)
elif len(checks) == 0:
await stream.send_message(HealthCheckResponse(
status=HealthCheckResponse.SERVING,
))
while True:
await asyncio.sleep(3600)
else:
events = []
for check in checks:
events.append(await check.__subscribe__())
waits = _reset_waits(events, {})
try:
await stream.send_message(HealthCheckResponse(
status=_status(checks),
))
while True:
await asyncio.wait(waits.values(),
return_when=asyncio.FIRST_COMPLETED)
waits = _reset_waits(events, waits)
await stream.send_message(HealthCheckResponse(
status=_status(checks),
))
finally:
for check, event in zip(checks, events):
await check.__unsubscribe__(event)
for wait in waits.values():
if not wait.done():
wait.cancel()
|