1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
|
import logging
import threading
import grpc
import six
from six.moves import queue
import etcd3.etcdrpc as etcdrpc
import etcd3.events as events
import etcd3.exceptions as exceptions
import etcd3.utils as utils
_log = logging.getLogger(__name__)
class Watch(object):
def __init__(self, watch_id, iterator=None, etcd_client=None):
self.watch_id = watch_id
self.etcd_client = etcd_client
self.iterator = iterator
def cancel(self):
self.etcd_client.cancel_watch(self.watch_id)
def iterator(self):
if self.iterator is not None:
return self.iterator
raise ValueError('Undefined iterator')
class Watcher(object):
def __init__(self, watchstub, timeout=None, call_credentials=None,
metadata=None):
self.timeout = timeout
self._watch_stub = watchstub
self._credentials = call_credentials
self._metadata = metadata
self._lock = threading.Lock()
self._request_queue = queue.Queue(maxsize=10)
self._callbacks = {}
self._callback_thread = None
self._new_watch_cond = threading.Condition(lock=self._lock)
self._new_watch = None
def _create_watch_request(self, key, range_end=None, start_revision=None,
progress_notify=False, filters=None,
prev_kv=False):
create_watch = etcdrpc.WatchCreateRequest()
create_watch.key = utils.to_bytes(key)
if range_end is not None:
create_watch.range_end = utils.to_bytes(range_end)
if start_revision is not None:
create_watch.start_revision = start_revision
if progress_notify:
create_watch.progress_notify = progress_notify
if filters is not None:
create_watch.filters = filters
if prev_kv:
create_watch.prev_kv = prev_kv
return etcdrpc.WatchRequest(create_request=create_watch)
def add_callback(self, key, callback, range_end=None, start_revision=None,
progress_notify=False, filters=None, prev_kv=False):
rq = self._create_watch_request(key, range_end=range_end,
start_revision=start_revision,
progress_notify=progress_notify,
filters=filters, prev_kv=prev_kv)
with self._lock:
# Start the callback thread if it is not yet running.
if not self._callback_thread:
thread_name = 'etcd3_watch_%x' % (id(self),)
self._callback_thread = threading.Thread(name=thread_name,
target=self._run)
self._callback_thread.daemon = True
self._callback_thread.start()
# Only one create watch request can be pending at a time, so if
# there one already, then wait for it to complete first.
while self._new_watch:
self._new_watch_cond.wait()
# Submit a create watch request.
new_watch = _NewWatch(callback)
self._request_queue.put(rq)
self._new_watch = new_watch
try:
# Wait for the request to be completed, or timeout.
self._new_watch_cond.wait(timeout=self.timeout)
# If the request not completed yet, then raise a timeout
# exception.
if new_watch.id is None and new_watch.err is None:
raise exceptions.WatchTimedOut()
# Raise an exception if the watch request failed.
if new_watch.err:
raise new_watch.err
finally:
# Wake up threads stuck on add_callback call if any.
self._new_watch = None
self._new_watch_cond.notify_all()
return new_watch.id
def cancel(self, watch_id):
with self._lock:
callback = self._callbacks.pop(watch_id, None)
if not callback:
return
self._cancel_no_lock(watch_id)
def _run(self):
while True:
response_iter = self._watch_stub.Watch(
_new_request_iter(self._request_queue),
credentials=self._credentials,
metadata=self._metadata)
try:
for rs in response_iter:
self._handle_response(rs)
except grpc.RpcError as err:
with self._lock:
if self._new_watch:
self._new_watch.err = err
self._new_watch_cond.notify_all()
callbacks = self._callbacks
self._callbacks = {}
# Rotate request queue. This way we can terminate one gRPC
# stream and initiate another one whilst avoiding a race
# between them over requests in the queue.
self._request_queue.put(None)
self._request_queue = queue.Queue(maxsize=10)
for callback in six.itervalues(callbacks):
_safe_callback(callback, err)
def _handle_response(self, rs):
with self._lock:
if rs.created:
# If the new watch request has already expired then cancel the
# created watch right away.
if not self._new_watch:
self._cancel_no_lock(rs.watch_id)
return
if rs.compact_revision != 0:
self._new_watch.err = exceptions.RevisionCompactedError(
rs.compact_revision)
return
self._callbacks[rs.watch_id] = self._new_watch.callback
self._new_watch.id = rs.watch_id
self._new_watch_cond.notify_all()
callback = self._callbacks.get(rs.watch_id)
# Ignore leftovers from canceled watches.
if not callback:
return
# The watcher can be safely reused, but adding a new event
# to indicate that the revision is already compacted
# requires api change which would break all users of this
# module. So, raising an exception if a watcher is still
# alive.
if rs.compact_revision != 0:
err = exceptions.RevisionCompactedError(rs.compact_revision)
_safe_callback(callback, err)
self.cancel(rs.watch_id)
return
# Call the callback even when there are no events in the watch
# response so as not to ignore progress notify responses.
if rs.events or not (rs.created or rs.canceled):
new_events = [events.new_event(event) for event in rs.events]
response = WatchResponse(rs.header, new_events)
_safe_callback(callback, response)
def _cancel_no_lock(self, watch_id):
cancel_watch = etcdrpc.WatchCancelRequest()
cancel_watch.watch_id = watch_id
rq = etcdrpc.WatchRequest(cancel_request=cancel_watch)
self._request_queue.put(rq)
class WatchResponse(object):
def __init__(self, header, events):
self.header = header
self.events = events
class _NewWatch(object):
def __init__(self, callback):
self.callback = callback
self.id = None
self.err = None
def _new_request_iter(_request_queue):
while True:
rq = _request_queue.get()
if rq is None:
return
yield rq
def _safe_callback(callback, response_or_err):
try:
callback(response_or_err)
except Exception:
_log.exception('Watch callback failed')
|