1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
|
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import functools
import logging
import uuid
import uamqp
from uamqp import c_uamqp, constants, errors, utils
_logger = logging.getLogger(__name__)
class MessageReceiver(object):
"""A Message Receiver that opens its own exclsuive Link on an
existing Session.
:ivar receive_settle_mode: The mode by which to settle message receive
operations. If set to `PeekLock`, the receiver will lock a message once received until
the client accepts or rejects the message. If set to `ReceiveAndDelete`, the service
will assume successful receipt of the message and clear it from the queue. The
default is `PeekLock`.
:vartype receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:ivar send_settle_mode: The mode by which to settle message send
operations. If set to `Unsettled`, the client will wait for a confirmation
from the service that the message was successfully sent. If set to 'Settled',
the client will not wait for confirmation and assume success.
:vartype send_settle_mode: ~uamqp.constants.SenderSettleMode
:ivar max_message_size: The maximum allowed message size negotiated for the Link.
:vartype max_message_size: int
:param session: The underlying Session with which to receive.
:type session: ~uamqp.session.Session
:param source: The AMQP endpoint to receive from.
:type source: ~uamqp.address.Source
:param target: The name of target (i.e. the client).
:type target: str or bytes
:param name: A unique name for the receiver. If not specified a GUID will be used.
:type name: str or bytes
:param receive_settle_mode: The mode by which to settle message receive
operations. If set to `PeekLock`, the receiver will lock a message once received until
the client accepts or rejects the message. If set to `ReceiveAndDelete`, the service
will assume successful receipt of the message and clear it from the queue. The
default is `PeekLock`.
:type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:param send_settle_mode: The mode by which to settle message send
operations. If set to `Unsettled`, the client will wait for a confirmation
from the service that the message was successfully sent. If set to 'Settled',
the client will not wait for confirmation and assume success.
:type send_settle_mode: ~uamqp.constants.SenderSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param max_message_size: The maximum allowed message size negotiated for the Link.
:type max_message_size: int
:param prefetch: The receiver Link credit that determines how many
messages the Link will attempt to handle per connection iteration.
:type prefetch: int
:param properties: Metadata to be sent in the Link ATTACH frame.
:type properties: dict
:param error_policy: A policy for parsing errors on link, connection and message
disposition to determine whether the error should be retryable.
:type error_policy: ~uamqp.errors.ErrorPolicy
:param debug: Whether to turn on network trace logs. If `True`, trace logs
will be logged at INFO level. Default is `False`.
:type debug: bool
:param encoding: The encoding to use for parameters supplied as strings.
Default is 'UTF-8'
:type encoding: str
"""
def __init__(self, session, source, target,
on_message_received,
name=None,
receive_settle_mode=constants.ReceiverSettleMode.PeekLock,
send_settle_mode=constants.SenderSettleMode.Unsettled,
max_message_size=constants.MAX_MESSAGE_LENGTH_BYTES,
prefetch=300,
properties=None,
error_policy=None,
debug=False,
encoding='UTF-8',
desired_capabilities=None):
# pylint: disable=protected-access
if name:
self.name = name.encode(encoding) if isinstance(name, str) else name
else:
self.name = str(uuid.uuid4()).encode(encoding)
target = target.encode(encoding) if isinstance(target, str) else target
role = constants.Role.Receiver
self.source = source._address.value
self.target = c_uamqp.Messaging.create_target(target)
self.on_message_received = on_message_received
self.encoding = encoding
self.error_policy = error_policy or errors.ErrorPolicy()
self._settle_mode = receive_settle_mode
self._conn = session._conn
self._session = session
self._link = c_uamqp.create_link(session._session, self.name, role.value, self.source, self.target)
self._link.subscribe_to_detach_event(self)
if prefetch is not None:
self._link.set_prefetch_count(prefetch)
if properties:
self._link.set_attach_properties(utils.data_factory(properties, encoding=encoding))
if receive_settle_mode:
self.receive_settle_mode = receive_settle_mode
if send_settle_mode:
self.send_settle_mode = send_settle_mode
if max_message_size:
self.max_message_size = max_message_size
if desired_capabilities:
self._link.set_desired_capabilities(desired_capabilities)
self._receiver = c_uamqp.create_message_receiver(self._link, self)
self._receiver.set_trace(debug)
self._state = constants.MessageReceiverState.Idle
self._error = None
def __enter__(self):
"""Open the MessageReceiver in a context manager."""
self.open()
return self
def __exit__(self, *args):
"""Close the MessageReceiver when exiting a context manager."""
self.destroy()
def _state_changed(self, previous_state, new_state):
"""Callback called whenever the underlying Receiver undergoes a change
of state. This function wraps the states as Enums to prepare for
calling the public callback.
:param previous_state: The previous Receiver state.
:type previous_state: int
:param new_state: The new Receiver state.
:type new_state: int
"""
try:
try:
_previous_state = constants.MessageReceiverState(previous_state)
except ValueError:
_previous_state = previous_state
try:
_new_state = constants.MessageReceiverState(new_state)
except ValueError:
_new_state = new_state
if _previous_state == constants.MessageReceiverState.Opening \
and _new_state == constants.MessageReceiverState.Error:
_logger.info("Receiver link failed to open - expecting to receive DETACH frame.")
elif self._session._link_error: # pylint: disable=protected-access
_logger.info("Receiver link ATTACH frame invalid - expecting to receive DETACH frame.")
else:
self.on_state_changed(_previous_state, _new_state)
except KeyboardInterrupt:
_logger.error("Received shutdown signal while updating receiver state from {} to {}".format(
previous_state, new_state))
self._error = errors.AMQPClientShutdown()
def _detach_received(self, error):
"""Callback called when a link DETACH frame is received.
This callback will process the received DETACH error to determine if
the link is recoverable or whether it should be shutdown.
:param error: The error information from the detach
frame.
:type error: ~uamqp.errors.ErrorResponse
"""
# pylint: disable=protected-access
if error:
condition = error.condition
description = error.description
info = error.info
else:
condition = b"amqp:unknown-error"
description = None
info = None
self._error = errors._process_link_error(self.error_policy, condition, description, info)
_logger.info("Received Link detach event: %r\nLink: %r\nDescription: %r"
"\nDetails: %r\nRetryable: %r\nConnection: %r",
condition, self.name, description, info, self._error.action.retry,
self._session._connection.container_id)
def _settle_message(self, message_number, response):
"""Send a settle dispostition for a received message.
:param message_number: The delivery number of the message
to settle.
:type message_number: int
:response: The type of disposition to respond with, e.g. whether
the message was accepted, rejected or abandoned.
:type response: ~uamqp.errors.MessageResponse
"""
if not response or isinstance(response, errors.MessageAlreadySettled):
return
if isinstance(response, errors.MessageAccepted):
self._receiver.settle_accepted_message(message_number)
elif isinstance(response, errors.MessageReleased):
self._receiver.settle_released_message(message_number)
elif isinstance(response, errors.MessageRejected):
self._receiver.settle_rejected_message(
message_number,
response.error_condition,
response.error_description,
response.error_info)
elif isinstance(response, errors.MessageModified):
self._receiver.settle_modified_message(
message_number,
response.failed,
response.undeliverable,
response.annotations)
else:
raise ValueError("Invalid message response type: {}".format(response))
def _message_received(self, message):
"""Callback run on receipt of every message. If there is
a user-defined callback, this will be called.
Additionally if the client is retrieving messages for a batch
or iterator, the message will be added to an internal queue.
:param message: c_uamqp.Message
"""
# pylint: disable=protected-access
message_number = self._receiver.last_received_message_number()
if self._settle_mode == constants.ReceiverSettleMode.ReceiveAndDelete:
settler = None
else:
settler = functools.partial(self._settle_message, message_number)
try:
wrapped_message = uamqp.Message(
message=message,
encoding=self.encoding,
settler=settler,
delivery_no=message_number)
self.on_message_received(wrapped_message)
except RuntimeError:
condition = b"amqp:unknown-error"
self._error = errors._process_link_error(self.error_policy, condition, None, None)
_logger.info("Unable to settle message no %r. Disconnecting.\nLink: %r\nConnection: %r",
message_number,
self.name,
self._session._connection.container_id)
except KeyboardInterrupt:
_logger.error("Received shutdown signal while processing message no %r\nRejecting message.", message_number)
self._receiver.settle_modified_message(message_number, True, True, None)
self._error = errors.AMQPClientShutdown()
except Exception as e: # pylint: disable=broad-except
_logger.error("Error processing message no %r: %r\nRejecting message.", message_number, e)
self._receiver.settle_modified_message(message_number, True, True, None)
def get_state(self):
"""Get the state of the MessageReceiver and its underlying Link.
:rtype: ~uamqp.constants.MessageReceiverState
"""
try:
raise self._error
except TypeError:
pass
except errors.LinkRedirect as e:
_logger.info("%r", e)
raise
except Exception as e:
_logger.warning("%r", e)
raise
return self._state
def work(self):
"""Update the link status."""
self._link.do_work()
def reset_link_credit(self, link_credit, **kwargs):
"""Reset the link credit. This method would send flow control frame to the sender.
:param link_credit: The link credit amount that is requested.
:type link_credit: int
"""
drain = kwargs.get("drain", False)
self._link.reset_link_credit(link_credit, drain)
def destroy(self):
"""Close both the Receiver and the Link. Clean up any C objects."""
self._receiver.destroy()
self._link.destroy()
def open(self):
"""Open the MessageReceiver in order to start processing messages.
:raises: ~uamqp.errors.AMQPConnectionError if the Receiver raises
an error on opening. This can happen if the source URI is invalid
or the credentials are rejected.
"""
try:
self._receiver.open(self)
except ValueError:
raise errors.AMQPConnectionError(
"Failed to open Message Receiver. "
"Please confirm credentials and target URI.")
def close(self):
"""Close the Receiver, leaving the link intact."""
self._receiver.close()
def on_state_changed(self, previous_state, new_state):
"""Callback called whenever the underlying Receiver undergoes a change
of state. This function can be overridden.
:param previous_state: The previous Receiver state.
:type previous_state: ~uamqp.constants.MessageReceiverState
:param new_state: The new Receiver state.
:type new_state: ~uamqp.constants.MessageReceiverState
"""
# pylint: disable=protected-access
_logger.info("Message receiver %r state changed from %r to %r on connection: %r",
self.name, previous_state, new_state, self._session._connection.container_id)
self._state = new_state
@property
def receive_settle_mode(self):
return self._link.receive_settle_mode
@receive_settle_mode.setter
def receive_settle_mode(self, value):
self._link.receive_settle_mode = value.value
@property
def send_settle_mode(self):
return self._link.send_settle_mode
@send_settle_mode.setter
def send_settle_mode(self, value):
self._link.send_settle_mode = value.value
@property
def max_message_size(self):
return self._link.max_message_size
@max_message_size.setter
def max_message_size(self, value):
self._link.max_message_size = int(value)
|