1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
"""Handling of notifications."""
from __future__ import annotations
import select
from typing import TYPE_CHECKING, Callable
from .core import Query
from .error import db_error
if TYPE_CHECKING:
from .db import DB
__all__ = ['NotificationHandler']
# The notification handler
class NotificationHandler:
"""A PostgreSQL client-side asynchronous notification handler."""
def __init__(self, db: DB, event: str, callback: Callable,
arg_dict: dict | None = None,
timeout: int | float | None = None,
stop_event: str | None = None):
"""Initialize the notification handler.
You must pass a PyGreSQL database connection, the name of an
event (notification channel) to listen for and a callback function.
You can also specify a dictionary arg_dict that will be passed as
the single argument to the callback function, and a timeout value
in seconds (a floating point number denotes fractions of seconds).
If it is absent or None, the callers will never time out. If the
timeout is reached, the callback function will be called with a
single argument that is None. If you set the timeout to zero,
the handler will poll notifications synchronously and return.
You can specify the name of the event that will be used to signal
the handler to stop listening as stop_event. By default, it will
be the event name prefixed with 'stop_'.
"""
self.db: DB | None = db
self.event = event
self.stop_event = stop_event or f'stop_{event}'
self.listening = False
self.callback = callback
if arg_dict is None:
arg_dict = {}
self.arg_dict = arg_dict
self.timeout = timeout
def __del__(self) -> None:
"""Delete the notification handler."""
self.unlisten()
def close(self) -> None:
"""Stop listening and close the connection."""
if self.db:
self.unlisten()
self.db.close()
self.db = None
def listen(self) -> None:
"""Start listening for the event and the stop event."""
db = self.db
if db and not self.listening:
db.query(f'listen "{self.event}"')
db.query(f'listen "{self.stop_event}"')
self.listening = True
def unlisten(self) -> None:
"""Stop listening for the event and the stop event."""
db = self.db
if db and self.listening:
db.query(f'unlisten "{self.event}"')
db.query(f'unlisten "{self.stop_event}"')
self.listening = False
def notify(self, db: DB | None = None, stop: bool = False,
payload: str | None = None) -> Query | None:
"""Generate a notification.
Optionally, you can pass a payload with the notification.
If you set the stop flag, a stop notification will be sent that
will cause the handler to stop listening.
Note: If the notification handler is running in another thread, you
must pass a different database connection since PyGreSQL database
connections are not thread-safe.
"""
if not self.listening:
return None
if not db:
db = self.db
if not db:
return None
event = self.stop_event if stop else self.event
cmd = f'notify "{event}"'
if payload:
cmd += f", '{payload}'"
return db.query(cmd)
def __call__(self) -> None:
"""Invoke the notification handler.
The handler is a loop that listens for notifications on the event
and stop event channels. When either of these notifications are
received, its associated 'pid', 'event' and 'extra' (the payload
passed with the notification) are inserted into its arg_dict
dictionary and the callback is invoked with this dictionary as
a single argument. When the handler receives a stop event, it
stops listening to both events and return.
In the special case that the timeout of the handler has been set
to zero, the handler will poll all events synchronously and return.
If will keep listening until it receives a stop event.
Note: If you run this loop in another thread, don't use the same
database connection for database operations in the main thread.
"""
if not self.db:
return
self.listen()
poll = self.timeout == 0
rlist = [] if poll else [self.db.fileno()]
while self.db and self.listening:
# noinspection PyUnboundLocalVariable
if poll or select.select(rlist, [], [], self.timeout)[0]:
while self.db and self.listening:
notice = self.db.getnotify()
if not notice: # no more messages
break
event, pid, extra = notice
if event not in (self.event, self.stop_event):
self.unlisten()
raise db_error(
f'Listening for "{self.event}"'
f' and "{self.stop_event}",'
f' but notified of "{event}"')
if event == self.stop_event:
self.unlisten()
self.arg_dict.update(pid=pid, event=event, extra=extra)
self.callback(self.arg_dict)
if poll:
break
else: # we timed out
self.unlisten()
self.callback(None)
|