1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
|
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import asyncio
import logging
import uamqp
from uamqp import c_uamqp, connection
from uamqp.async_ops.utils import get_dict_with_loop_if_needed
_logger = logging.getLogger(__name__)
class ConnectionAsync(connection.Connection):
"""An Asynchronous AMQP Connection. A single Connection can have multiple
Sessions, and can be shared between multiple Clients.
:ivar max_frame_size: Maximum AMQP frame size. Default is 63488 bytes.
:vartype max_frame_size: int
:ivar channel_max: Maximum number of Session channels in the Connection.
:vartype channel_max: int
:ivar idle_timeout: Timeout in milliseconds after which the Connection will close
if there is no further activity.
:vartype idle_timeout: int
:ivar properties: Connection properties.
:vartype properties: dict
:param hostname: The hostname of the AMQP service with which to establish
a connection.
:type hostname: bytes or str
:param sasl: Authentication for the connection. If none is provided SASL Annoymous
authentication will be used.
:type sasl: ~uamqp.authentication.common.AMQPAuth
:param container_id: The name for the client, also known as the Container ID.
If no name is provided, a random GUID will be used.
:type container_id: str or bytes
:param max_frame_size: Maximum AMQP frame size. Default is 63488 bytes.
:type max_frame_size: int
:param channel_max: Maximum number of Session channels in the Connection.
:type channel_max: int
:param idle_timeout: Timeout in milliseconds after which the Connection will close
if there is no further activity.
:type idle_timeout: int
:param properties: Connection properties.
:type properties: dict
:param remote_idle_timeout_empty_frame_send_ratio: Ratio of empty frames to
idle time for Connections with no activity. Value must be between
0.0 and 1.0 inclusive. Default is 0.5.
:type remote_idle_timeout_empty_frame_send_ratio: float
:param debug: Whether to turn on network trace logs. If `True`, trace logs
will be logged at INFO level. Default is `False`.
:type debug: bool
:param encoding: The encoding to use for parameters supplied as strings.
Default is 'UTF-8'
:type encoding: str
"""
def __init__(self, hostname, sasl,
container_id=False,
max_frame_size=None,
channel_max=None,
idle_timeout=None,
properties=None,
remote_idle_timeout_empty_frame_send_ratio=None,
error_policy=None,
debug=False,
encoding='UTF-8',
loop=None):
self._internal_kwargs = get_dict_with_loop_if_needed(loop)
super(ConnectionAsync, self).__init__(
hostname, sasl,
container_id=container_id,
max_frame_size=max_frame_size,
channel_max=channel_max,
idle_timeout=idle_timeout,
properties=properties,
remote_idle_timeout_empty_frame_send_ratio=remote_idle_timeout_empty_frame_send_ratio,
error_policy=error_policy,
debug=debug,
encoding=encoding)
self._async_lock = asyncio.Lock(**self._internal_kwargs)
async def __aenter__(self):
"""Open the Connection in an async context manager."""
return self
async def __aexit__(self, *args):
"""Close the Connection when exiting an async context manager."""
_logger.debug("Exiting connection %r context.", self.container_id)
await self.destroy_async()
_logger.debug("Finished exiting connection %r context.", self.container_id)
async def _close_async(self):
_logger.info("Shutting down connection %r.", self.container_id)
self._closing = True
if self._cbs:
await self.auth.close_authenticator_async()
self._cbs = None
self._conn.destroy()
self.auth.close()
_logger.info("Connection shutdown complete %r.", self.container_id)
@property
def loop(self):
return self._internal_kwargs.get("loop")
async def lock_async(self, timeout=3.0):
await asyncio.wait_for(self._async_lock.acquire(), timeout=timeout, **self._internal_kwargs)
def release_async(self):
try:
self._async_lock.release()
except RuntimeError:
pass
except:
_logger.debug("Got error when attempting to release async connection lock.")
try:
self._async_lock.release()
except RuntimeError:
pass
raise
async def work_async(self):
"""Perform a single Connection iteration asynchronously."""
try:
raise self._error
except TypeError:
pass
except Exception as e:
_logger.warning("%r", e)
raise
try:
await self.lock_async()
if self._closing:
_logger.debug("Connection unlocked but shutting down.")
return
await asyncio.sleep(0, **self._internal_kwargs)
self._conn.do_work()
except asyncio.TimeoutError:
_logger.debug("Connection %r timed out while waiting for lock acquisition.", self.container_id)
finally:
await asyncio.sleep(0, **self._internal_kwargs)
self.release_async()
async def sleep_async(self, seconds):
"""Lock the connection for a given number of seconds.
:param seconds: Length of time to lock the connection.
:type seconds: int
"""
try:
await self.lock_async()
await asyncio.sleep(seconds, **self._internal_kwargs)
except asyncio.TimeoutError:
_logger.debug("Connection %r timed out while waiting for lock acquisition.", self.container_id)
finally:
self.release_async()
async def redirect_async(self, redirect_error, auth):
"""Redirect the connection to an alternative endpoint.
:param redirect: The Link DETACH redirect details.
:type redirect: ~uamqp.errors.LinkRedirect
:param auth: Authentication credentials to the redirected endpoint.
:type auth: ~uamqp.authentication.common.AMQPAuth
"""
_logger.info("Redirecting connection %r.", self.container_id)
try:
await self.lock_async()
if self.hostname == redirect_error.hostname:
return
if self._state != c_uamqp.ConnectionState.END:
await self._close_async()
self.hostname = redirect_error.hostname
self.auth = auth
self._conn = self._create_connection(auth)
for setting, value in self._settings.items():
setattr(self, setting, value)
self._error = None
self._closing = False
except asyncio.TimeoutError:
_logger.debug("Connection %r timed out while waiting for lock acquisition.", self.container_id)
finally:
self.release_async()
async def destroy_async(self):
"""Close the connection asynchronously, and close any associated
CBS authentication session.
"""
try:
await self.lock_async()
_logger.debug("Unlocked connection %r to close.", self.container_id)
await self._close_async()
except asyncio.TimeoutError:
_logger.debug(
"Connection %r timed out while waiting for lock acquisition on destroy. Destroying anyway.",
self.container_id)
await self._close_async()
finally:
self.release_async()
uamqp._Platform.deinitialize() # pylint: disable=protected-access
|