1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600
|
"""
Implementation of `nbio_interface.AbstractIOServices` on top of a
selector-based I/O loop, such as tornado's and our home-grown
select_connection's I/O loops.
"""
import abc
import logging
import socket
import threading
from pika.adapters.utils import nbio_interface, io_services_utils
from pika.adapters.utils.io_services_utils import (check_callback_arg,
check_fd_arg)
LOGGER = logging.getLogger(__name__)
class AbstractSelectorIOLoop(object):
"""Selector-based I/O loop interface expected by
`selector_ioloop_adapter.SelectorIOServicesAdapter`
NOTE: this interface follows the corresponding methods and attributes
of `tornado.ioloop.IOLoop` in order to avoid additional adapter layering
when wrapping tornado's IOLoop.
"""
@property
@abc.abstractmethod
def READ(self): # pylint: disable=C0103
"""The value of the I/O loop's READ flag; READ/WRITE/ERROR may be used
with bitwise operators as expected.
Implementation note: the implementations can simply replace these
READ/WRITE/ERROR properties with class-level attributes
"""
@property
@abc.abstractmethod
def WRITE(self): # pylint: disable=C0103
"""The value of the I/O loop's WRITE flag; READ/WRITE/ERROR may be used
with bitwise operators as expected
"""
@property
@abc.abstractmethod
def ERROR(self): # pylint: disable=C0103
"""The value of the I/O loop's ERROR flag; READ/WRITE/ERROR may be used
with bitwise operators as expected
"""
@abc.abstractmethod
def close(self):
"""Release IOLoop's resources.
the `close()` method is intended to be called by the application or test
code only after `start()` returns. After calling `close()`, no other
interaction with the closed instance of `IOLoop` should be performed.
"""
@abc.abstractmethod
def start(self):
"""Run the I/O loop. It will loop until requested to exit. See `stop()`.
"""
@abc.abstractmethod
def stop(self):
"""Request exit from the ioloop. The loop is NOT guaranteed to
stop before this method returns.
To invoke `stop()` safely from a thread other than this IOLoop's thread,
call it via `add_callback_threadsafe`; e.g.,
`ioloop.add_callback(ioloop.stop)`
"""
@abc.abstractmethod
def call_later(self, delay, callback):
"""Add the callback to the IOLoop timer to be called after delay seconds
from the time of call on best-effort basis. Returns a handle to the
timeout.
:param float delay: The number of seconds to wait to call callback
:param callable callback: The callback method
:returns: handle to the created timeout that may be passed to
`remove_timeout()`
:rtype: object
"""
@abc.abstractmethod
def remove_timeout(self, timeout_handle):
"""Remove a timeout
:param timeout_handle: Handle of timeout to remove
"""
@abc.abstractmethod
def add_callback(self, callback):
"""Requests a call to the given function as soon as possible in the
context of this IOLoop's thread.
NOTE: This is the only thread-safe method in IOLoop. All other
manipulations of IOLoop must be performed from the IOLoop's thread.
For example, a thread may request a call to the `stop` method of an
ioloop that is running in a different thread via
`ioloop.add_callback_threadsafe(ioloop.stop)`
:param callable callback: The callback method
"""
@abc.abstractmethod
def add_handler(self, fd, handler, events):
"""Start watching the given file descriptor for events
:param int fd: The file descriptor
:param callable handler: When requested event(s) occur,
`handler(fd, events)` will be called.
:param int events: The event mask using READ, WRITE, ERROR.
"""
@abc.abstractmethod
def update_handler(self, fd, events):
"""Changes the events we watch for
:param int fd: The file descriptor
:param int events: The event mask using READ, WRITE, ERROR
"""
@abc.abstractmethod
def remove_handler(self, fd):
"""Stop watching the given file descriptor for events
:param int fd: The file descriptor
"""
class SelectorIOServicesAdapter(io_services_utils.SocketConnectionMixin,
io_services_utils.StreamingConnectionMixin,
nbio_interface.AbstractIOServices,
nbio_interface.AbstractFileDescriptorServices):
"""Implements the
:py:class:`.nbio_interface.AbstractIOServices` interface
on top of selector-style native loop having the
:py:class:`AbstractSelectorIOLoop` interface, such as
:py:class:`pika.selection_connection.IOLoop` and :py:class:`tornado.IOLoop`.
NOTE:
:py:class:`.nbio_interface.AbstractFileDescriptorServices`
interface is only required by the mixins.
"""
def __init__(self, native_loop):
"""
:param AbstractSelectorIOLoop native_loop: An instance compatible with
the `AbstractSelectorIOLoop` interface, but not necessarily derived
from it.
"""
self._loop = native_loop
# Active watchers: maps file descriptors to `_FileDescriptorCallbacks`
self._watchers = dict()
# Native loop-specific event masks of interest
self._readable_mask = self._loop.READ
# NOTE: tying ERROR to WRITE is particularly handy for Windows, whose
# `select.select()` differs from Posix by reporting
# connection-establishment failure only through exceptfds (ERROR event),
# while the typical application workflow is to wait for the socket to
# become writable when waiting for socket connection to be established.
self._writable_mask = self._loop.WRITE | self._loop.ERROR
def get_native_ioloop(self):
"""Implement
:py:meth:`.nbio_interface.AbstractIOServices.get_native_ioloop()`.
"""
return self._loop
def close(self):
"""Implement :py:meth:`.nbio_interface.AbstractIOServices.close()`.
"""
self._loop.close()
def run(self):
"""Implement :py:meth:`.nbio_interface.AbstractIOServices.run()`.
"""
self._loop.start()
def stop(self):
"""Implement :py:meth:`.nbio_interface.AbstractIOServices.stop()`.
"""
self._loop.stop()
def add_callback_threadsafe(self, callback):
"""Implement
:py:meth:`.nbio_interface.AbstractIOServices.add_callback_threadsafe()`.
"""
self._loop.add_callback(callback)
def call_later(self, delay, callback):
"""Implement :py:meth:`.nbio_interface.AbstractIOServices.call_later()`.
"""
return _TimerHandle(self._loop.call_later(delay, callback), self._loop)
def getaddrinfo(self,
host,
port,
on_done,
family=0,
socktype=0,
proto=0,
flags=0):
"""Implement :py:meth:`.nbio_interface.AbstractIOServices.getaddrinfo()`.
"""
return _SelectorIOLoopIOHandle(
_AddressResolver(
native_loop=self._loop,
host=host,
port=port,
family=family,
socktype=socktype,
proto=proto,
flags=flags,
on_done=on_done).start())
def set_reader(self, fd, on_readable):
"""Implement
:py:meth:`.nbio_interface.AbstractFileDescriptorServices.set_reader()`.
"""
LOGGER.debug('SelectorIOServicesAdapter.set_reader(%s, %r)', fd,
on_readable)
check_fd_arg(fd)
check_callback_arg(on_readable, 'on_readable')
try:
callbacks = self._watchers[fd]
except KeyError:
self._loop.add_handler(fd, self._on_reader_writer_fd_events,
self._readable_mask)
self._watchers[fd] = _FileDescriptorCallbacks(reader=on_readable)
LOGGER.debug('set_reader(%s, _) added handler Rd', fd)
else:
if callbacks.reader is None:
assert callbacks.writer is not None
self._loop.update_handler(
fd, self._readable_mask | self._writable_mask)
LOGGER.debug('set_reader(%s, _) updated handler RdWr', fd)
else:
LOGGER.debug('set_reader(%s, _) replacing reader', fd)
callbacks.reader = on_readable
def remove_reader(self, fd):
"""Implement
:py:meth:`.nbio_interface.AbstractFileDescriptorServices.remove_reader()`.
"""
LOGGER.debug('SelectorIOServicesAdapter.remove_reader(%s)', fd)
check_fd_arg(fd)
try:
callbacks = self._watchers[fd]
except KeyError:
LOGGER.debug('remove_reader(%s) neither was set', fd)
return False
if callbacks.reader is None:
assert callbacks.writer is not None
LOGGER.debug('remove_reader(%s) reader wasn\'t set Wr', fd)
return False
callbacks.reader = None
if callbacks.writer is None:
del self._watchers[fd]
self._loop.remove_handler(fd)
LOGGER.debug('remove_reader(%s) removed handler', fd)
else:
self._loop.update_handler(fd, self._writable_mask)
LOGGER.debug('remove_reader(%s) updated handler Wr', fd)
return True
def set_writer(self, fd, on_writable):
"""Implement
:py:meth:`.nbio_interface.AbstractFileDescriptorServices.set_writer()`.
"""
LOGGER.debug('SelectorIOServicesAdapter.set_writer(%s, %r)', fd,
on_writable)
check_fd_arg(fd)
check_callback_arg(on_writable, 'on_writable')
try:
callbacks = self._watchers[fd]
except KeyError:
self._loop.add_handler(fd, self._on_reader_writer_fd_events,
self._writable_mask)
self._watchers[fd] = _FileDescriptorCallbacks(writer=on_writable)
LOGGER.debug('set_writer(%s, _) added handler Wr', fd)
else:
if callbacks.writer is None:
assert callbacks.reader is not None
# NOTE: Set the writer func before setting the mask!
# Otherwise a race condition can occur where ioloop tries to
# call writer when it is still None.
callbacks.writer = on_writable
self._loop.update_handler(
fd, self._readable_mask | self._writable_mask)
LOGGER.debug('set_writer(%s, _) updated handler RdWr', fd)
else:
LOGGER.debug('set_writer(%s, _) replacing writer', fd)
callbacks.writer = on_writable
def remove_writer(self, fd):
"""Implement
:py:meth:`.nbio_interface.AbstractFileDescriptorServices.remove_writer()`.
"""
LOGGER.debug('SelectorIOServicesAdapter.remove_writer(%s)', fd)
check_fd_arg(fd)
try:
callbacks = self._watchers[fd]
except KeyError:
LOGGER.debug('remove_writer(%s) neither was set.', fd)
return False
if callbacks.writer is None:
assert callbacks.reader is not None
LOGGER.debug('remove_writer(%s) writer wasn\'t set Rd', fd)
return False
callbacks.writer = None
if callbacks.reader is None:
del self._watchers[fd]
self._loop.remove_handler(fd)
LOGGER.debug('remove_writer(%s) removed handler', fd)
else:
self._loop.update_handler(fd, self._readable_mask)
LOGGER.debug('remove_writer(%s) updated handler Rd', fd)
return True
def _on_reader_writer_fd_events(self, fd, events):
"""Handle indicated file descriptor events requested via `set_reader()`
and `set_writer()`.
:param fd: file descriptor
:param events: event mask using native loop's READ/WRITE/ERROR. NOTE:
depending on the underlying poller mechanism, ERROR may be indicated
upon certain file description state even though we don't request it.
We ignore ERROR here since `set_reader()`/`set_writer()` don't
request for it.
"""
callbacks = self._watchers[fd]
if events & self._readable_mask and callbacks.reader is None:
# NOTE: we check for consistency here ahead of the writer callback
# because the writer callback, if any, can change the events being
# watched
LOGGER.warning(
'READ indicated on fd=%s, but reader callback is None; '
'events=%s', fd, bin(events))
if events & self._writable_mask:
if callbacks.writer is not None:
callbacks.writer()
else:
LOGGER.warning(
'WRITE indicated on fd=%s, but writer callback is None; '
'events=%s', fd, bin(events))
if events & self._readable_mask:
if callbacks.reader is not None:
callbacks.reader()
else:
# Reader callback might have been removed in the scope of writer
# callback.
pass
class _FileDescriptorCallbacks(object):
"""Holds reader and writer callbacks for a file descriptor"""
__slots__ = ('reader', 'writer')
def __init__(self, reader=None, writer=None):
self.reader = reader
self.writer = writer
class _TimerHandle(nbio_interface.AbstractTimerReference):
"""This module's adaptation of `nbio_interface.AbstractTimerReference`.
"""
def __init__(self, handle, loop):
"""
:param opaque handle: timer handle from the underlying loop
implementation that may be passed to its `remove_timeout()` method
:param AbstractSelectorIOLoop loop: the I/O loop instance that created
the timeout.
"""
self._handle = handle
self._loop = loop
def cancel(self):
if self._loop is not None:
self._loop.remove_timeout(self._handle)
self._handle = None
self._loop = None
class _SelectorIOLoopIOHandle(nbio_interface.AbstractIOReference):
"""This module's adaptation of `nbio_interface.AbstractIOReference`
"""
def __init__(self, subject):
"""
:param subject: subject of the reference containing a `cancel()` method
"""
self._cancel = subject.cancel
def cancel(self):
"""Cancel pending operation
:returns: False if was already done or cancelled; True otherwise
:rtype: bool
"""
return self._cancel()
class _AddressResolver(object):
"""Performs getaddrinfo asynchronously using a thread, then reports result
via callback from the given I/O loop.
NOTE: at this stage, we're using a thread per request, which may prove
inefficient and even prohibitive if the app performs many of these
operations concurrently.
"""
NOT_STARTED = 0
ACTIVE = 1
CANCELED = 2
COMPLETED = 3
def __init__(self, native_loop, host, port, family, socktype, proto, flags,
on_done):
"""
:param AbstractSelectorIOLoop native_loop:
:param host: `see socket.getaddrinfo()`
:param port: `see socket.getaddrinfo()`
:param family: `see socket.getaddrinfo()`
:param socktype: `see socket.getaddrinfo()`
:param proto: `see socket.getaddrinfo()`
:param flags: `see socket.getaddrinfo()`
:param on_done: on_done(records|BaseException) callback for reporting
result from the given I/O loop. The single arg will be either an
exception object (check for `BaseException`) in case of failure or
the result returned by `socket.getaddrinfo()`.
"""
check_callback_arg(on_done, 'on_done')
self._state = self.NOT_STARTED
self._result = None
self._loop = native_loop
self._host = host
self._port = port
self._family = family
self._socktype = socktype
self._proto = proto
self._flags = flags
self._on_done = on_done
self._mutex = threading.Lock()
self._threading_timer = None
def _cleanup(self):
"""Release resources
"""
self._loop = None
self._threading_timer = None
self._on_done = None
def start(self):
"""Start asynchronous DNS lookup.
:rtype: nbio_interface.AbstractIOReference
"""
assert self._state == self.NOT_STARTED, self._state
self._state = self.ACTIVE
self._threading_timer = threading.Timer(0, self._resolve)
self._threading_timer.start()
return _SelectorIOLoopIOHandle(self)
def cancel(self):
"""Cancel the pending resolver
:returns: False if was already done or cancelled; True otherwise
:rtype: bool
"""
# Try to cancel, but no guarantees
with self._mutex:
if self._state == self.ACTIVE:
LOGGER.debug('Canceling resolver for %s:%s', self._host,
self._port)
self._state = self.CANCELED
# Attempt to cancel, but not guaranteed
self._threading_timer.cancel()
self._cleanup()
return True
else:
LOGGER.debug(
'Ignoring _AddressResolver cancel request when not ACTIVE; '
'(%s:%s); state=%s', self._host, self._port, self._state)
return False
def _resolve(self):
"""Call `socket.getaddrinfo()` and return result via user's callback
function on the given I/O loop
"""
try:
# NOTE: on python 2.x, can't pass keyword args to getaddrinfo()
result = socket.getaddrinfo(self._host, self._port, self._family,
self._socktype, self._proto,
self._flags)
except Exception as exc: # pylint: disable=W0703
LOGGER.error('Address resolution failed: %r', exc)
result = exc
self._result = result
# Schedule result to be returned to user via user's event loop
with self._mutex:
if self._state == self.ACTIVE:
self._loop.add_callback(self._dispatch_result)
else:
LOGGER.debug(
'Asynchronous getaddrinfo cancellation detected; '
'in thread; host=%r', self._host)
def _dispatch_result(self):
"""This is called from the user's I/O loop to pass the result to the
user via the user's on_done callback
"""
if self._state == self.ACTIVE:
self._state = self.COMPLETED
try:
LOGGER.debug(
'Invoking asynchronous getaddrinfo() completion callback; '
'host=%r', self._host)
self._on_done(self._result)
finally:
self._cleanup()
else:
LOGGER.debug(
'Asynchronous getaddrinfo cancellation detected; '
'in I/O loop context; host=%r', self._host)
|