| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 
 | ##
# .notifyman - Receive and manage NOTIFY events.
##
"""
Notification Management Tools
Primarily this module houses the `NotificationManager` class which provides an
iterator for a NOTIFY event loop against a set of connections.
	>>> import postgresql
	>>> db = postgresql.open(...)
	>>> from postgresql.notifyman import NotificationManager
	>>> nm = NotificationManager(db, timeout = 10) # idle events every 10 seconds
	>>> for x in nm:
	...  if x is None:
	...   # idle event
	...   ...
	...  db, notifies = x
	...  for channel, payload, pid in notifies:
	...   ...
"""
from time import time
from select import select
from itertools import chain
class NotificationManager(object):
	"""
	A class for managing the asynchronous notifications received by a
	set of connections.
	Instances provide the iterator for an event loop that responds to NOTIFYs
	received by the connections being watched. There is no thread safety, so
	when a connection is being managed, it should not be used concurrently in
	other threads while being managed.
	"""
	__slots__ = (
		'connections',
		'garbage',
		'incoming',
		'timeout',
		'_last_time',
		'_pulled',
	)
	def __init__(self, *connections, timeout = None):
		self.settimeout(timeout)
		self.connections = set(connections)
		# Connections that failed.
		self.garbage = set()
		# Used to store NOTIFYs consumed from the connections
		self.incoming = None
		self._last_time = None
		# connection -> sequence of NOTIFYs
		self._pulled = dict()
	# Check the wire *and* wait for new messages.
	def _wait_on_wires(self, time = time, select = select):
		if self.timeout == 0:
			# We're polling.
			max_duration = 0
		else:
			# If timeout is None, we don't issue idle events, but
			# we still cycle in case the timeout is changed.
			if self._last_time is not None:
				max_duration = (self.timeout or 10) - (time() - self._last_time)
				if max_duration < 0:
					max_duration = 0
			else:
				self._last_time = time()
				max_duration = self.timeout or 10
		# Connections already marked as "bad" should not be checked.
		check = self.connections - self.garbage
		for db in check:
			if db.closed:
				self.connections.remove(db)
				self.garbage.add(db)
		check = self.connections - self.garbage
		r, w, x = select(check, (), check, max_duration)
		# Make sure the connection's _notifies get filled.
		for db in r:
			# Collect any pending events.
			try:
				# Even if db is in a failed transaction, this
				# 'null' command will succeed.
				# (only connection failures blow up)
				db.execute('')
			except Exception:
				# failed to collect notifies; put in exception list.
				# It is very unlikely that this is *not* a FATAL error.
				x.append(db)
		self.trash(x)
	def trash(self, connections):
		"""
		Remove the given connections from the set of good connections, and add
		them to the `garbage` set.
		This method can be overridden by subclasses to take a callback approach
		to connection failures.
		"""
		# Identify the bad connections.
		self.garbage.update(connections)
		self.connections.difference_update(connections)
	def queue(self, db, notifies):
		"""
		Queue the notifies for the specified connection. Upon success, the 
		This method can be overridden by subclasses to take a callback approach
		to notification management.
		"""
		l = self._pulled.setdefault(db, list())
		l.extend(notifies)
	# Check the connection's _notifies list; just scan everything.
	def _pull_from_connections(self):
		for db in self.connections:
			if not db._notifies:
				# nothing queued up, look at the next connection
				continue
			# Pull notifies into the NotificationManager
			decode = db.typio.decode
			notifies = [
				(decode(x.channel), decode(x.payload), x.pid)
				for x in db._notifies
			]
			self.queue(db, notifies)
			del db._notifies[:len(notifies)]
	# "Append" the pulled NOTIFYs to the 'incoming' iterator.
	def _queue_next(self):
		new_seqs = []
		for db in self._pulled:
			decode = db.typio.decode
			new_seqs.append((db, self._pulled[db]))
		if new_seqs:
			if self.incoming:
				# Already have incoming; not an expected condition,
				# but let's compensate.
				self.incoming, self._pulled = chain(self.incoming, iter(new_seqs)), {}
			else:
				self.incoming, self._pulled = iter(new_seqs), {}
		elif self.incoming is None:
			# Use this to trigger the StopIteration case of zero-timeout.
			self.incoming, self._pulled = iter(()), {}
	def _timedout(self, time = time):
		# Idles are guaranteed to occur, but make sure that
		# __next__ has a chance to check the connections and the wires.
		now = time()
		if self._last_time is None:
			self._last_time = now
		elif self.timeout and now >= (self._last_time + self.timeout):
			# Set last_time to None in case the timeout is so low
			# that this condition keeps NOTIFYs from being seen.
			self._last_time = None
			# Signal timeout.
			return True
		else:
			# toggle back to None.
			self._last_time = None
		return False
	def settimeout(self, seconds):
		"""
		Set the maximum duration, in seconds, for waiting for NOTIFYs on the
		set of managed connections. The given `seconds` argument can be a number
		or `None`.
		A timeout of `None` means no timeout, and "idle" events will never
		occur.
		A timeout of `0` means to never wait for NOTIFYs. This has the effect of
		a StopIteration being raised by `__next__` when there are no more
		Notifications available for any of the connections in the set. "Idle"
		events will never occur in this situation as well.
		A timeout greater than zero means to emit `None` as "idle" events into
		the loop at the specified interval. Idle events are guaranteed to occur.
		"""
		if seconds is not None and seconds < 0:
			raise ValueError("cannot set timeout less than zero")
		self.timeout = seconds
	def gettimeout(self):
		'Get the timeout.'
		return self.timeout
	def __iter__(self):
		return self
	def __next__(self, time = time):
		checked_wire = True
		# Loop until NOTIFY received or timeout.
		while True:
			if self.incoming is not None:
				try:
					return next(self.incoming)
				except StopIteration:
					# Nothing more in this incoming.
					self.incoming = None
					# Allow a zero timeout to be used to indicate
					# that there are no NOTIFYs to be read.
					# This can be used to poll a set of
					# connections instead of listening.
					if self.timeout == 0 or not self.connections:
						raise
			# timeout happened? yield the "idle" event.
			# This check **must** happen after .incoming is checked.
			# Never emit idle when there are real events.
			if self._timedout():
				return None
			if not checked_wire and self.connections:
				# Nothing queued up, check connections if any.
				self._wait_on_wires()
				checked_wire = True
			else:
				checked_wire = False
			self._pull_from_connections()
			self._queue_next()
 |