File: receiver.py

package info (click to toggle)
azure-uamqp-python 1.6.11-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 35,584 kB
  • sloc: ansic: 184,383; cpp: 7,738; python: 7,733; cs: 5,767; sh: 983; xml: 298; makefile: 34
file content (345 lines) | stat: -rw-r--r-- 15,434 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import functools
import logging
import uuid

import uamqp
from uamqp import c_uamqp, constants, errors, utils

_logger = logging.getLogger(__name__)


class MessageReceiver(object):
    """A Message Receiver that opens its own exclsuive Link on an
    existing Session.

    :ivar receive_settle_mode: The mode by which to settle message receive
     operations. If set to `PeekLock`, the receiver will lock a message once received until
     the client accepts or rejects the message. If set to `ReceiveAndDelete`, the service
     will assume successful receipt of the message and clear it from the queue. The
     default is `PeekLock`.
    :vartype receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
    :ivar send_settle_mode: The mode by which to settle message send
     operations. If set to `Unsettled`, the client will wait for a confirmation
     from the service that the message was successfully sent. If set to 'Settled',
     the client will not wait for confirmation and assume success.
    :vartype send_settle_mode: ~uamqp.constants.SenderSettleMode
    :ivar max_message_size: The maximum allowed message size negotiated for the Link.
    :vartype max_message_size: int

    :param session: The underlying Session with which to receive.
    :type session: ~uamqp.session.Session
    :param source: The AMQP endpoint to receive from.
    :type source: ~uamqp.address.Source
    :param target: The name of target (i.e. the client).
    :type target: str or bytes
    :param name: A unique name for the receiver. If not specified a GUID will be used.
    :type name: str or bytes
    :param receive_settle_mode: The mode by which to settle message receive
     operations. If set to `PeekLock`, the receiver will lock a message once received until
     the client accepts or rejects the message. If set to `ReceiveAndDelete`, the service
     will assume successful receipt of the message and clear it from the queue. The
     default is `PeekLock`.
    :type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
    :param send_settle_mode: The mode by which to settle message send
     operations. If set to `Unsettled`, the client will wait for a confirmation
     from the service that the message was successfully sent. If set to 'Settled',
     the client will not wait for confirmation and assume success.
    :type send_settle_mode: ~uamqp.constants.SenderSettleMode
    :param desired_capabilities: The extension capabilities desired from the peer endpoint.
     To create a desired_capabilities object, please do as follows:
        - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
        - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
    :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
    :param max_message_size: The maximum allowed message size negotiated for the Link.
    :type max_message_size: int
    :param prefetch: The receiver Link credit that determines how many
     messages the Link will attempt to handle per connection iteration.
    :type prefetch: int
    :param properties: Metadata to be sent in the Link ATTACH frame.
    :type properties: dict
    :param error_policy: A policy for parsing errors on link, connection and message
     disposition to determine whether the error should be retryable.
    :type error_policy: ~uamqp.errors.ErrorPolicy
    :param debug: Whether to turn on network trace logs. If `True`, trace logs
     will be logged at INFO level. Default is `False`.
    :type debug: bool
    :param encoding: The encoding to use for parameters supplied as strings.
     Default is 'UTF-8'
    :type encoding: str
    """

    def __init__(self, session, source, target,
                 on_message_received,
                 name=None,
                 receive_settle_mode=constants.ReceiverSettleMode.PeekLock,
                 send_settle_mode=constants.SenderSettleMode.Unsettled,
                 max_message_size=constants.MAX_MESSAGE_LENGTH_BYTES,
                 prefetch=300,
                 properties=None,
                 error_policy=None,
                 debug=False,
                 encoding='UTF-8',
                 desired_capabilities=None):
        # pylint: disable=protected-access
        if name:
            self.name = name.encode(encoding) if isinstance(name, str) else name
        else:
            self.name = str(uuid.uuid4()).encode(encoding)
        target = target.encode(encoding) if isinstance(target, str) else target
        role = constants.Role.Receiver

        self.source = source._address.value
        self.target = c_uamqp.Messaging.create_target(target)
        self.on_message_received = on_message_received
        self.encoding = encoding
        self.error_policy = error_policy or errors.ErrorPolicy()
        self._settle_mode = receive_settle_mode
        self._conn = session._conn
        self._session = session
        self._link = c_uamqp.create_link(session._session, self.name, role.value, self.source, self.target)
        self._link.subscribe_to_detach_event(self)
        if prefetch is not None:
            self._link.set_prefetch_count(prefetch)
        if properties:
            self._link.set_attach_properties(utils.data_factory(properties, encoding=encoding))
        if receive_settle_mode:
            self.receive_settle_mode = receive_settle_mode
        if send_settle_mode:
            self.send_settle_mode = send_settle_mode
        if max_message_size:
            self.max_message_size = max_message_size
        if desired_capabilities:
            self._link.set_desired_capabilities(desired_capabilities)

        self._receiver = c_uamqp.create_message_receiver(self._link, self)
        self._receiver.set_trace(debug)
        self._state = constants.MessageReceiverState.Idle
        self._error = None

    def __enter__(self):
        """Open the MessageReceiver in a context manager."""
        self.open()
        return self

    def __exit__(self, *args):
        """Close the MessageReceiver when exiting a context manager."""
        self.destroy()

    def _state_changed(self, previous_state, new_state):
        """Callback called whenever the underlying Receiver undergoes a change
        of state. This function wraps the states as Enums to prepare for
        calling the public callback.

        :param previous_state: The previous Receiver state.
        :type previous_state: int
        :param new_state: The new Receiver state.
        :type new_state: int
        """
        try:
            try:
                _previous_state = constants.MessageReceiverState(previous_state)
            except ValueError:
                _previous_state = previous_state
            try:
                _new_state = constants.MessageReceiverState(new_state)
            except ValueError:
                _new_state = new_state
            if _previous_state == constants.MessageReceiverState.Opening \
                    and _new_state == constants.MessageReceiverState.Error:
                _logger.info("Receiver link failed to open - expecting to receive DETACH frame.")
            elif self._session._link_error:  # pylint: disable=protected-access
                _logger.info("Receiver link ATTACH frame invalid - expecting to receive DETACH frame.")
            else:
                self.on_state_changed(_previous_state, _new_state)
        except KeyboardInterrupt:
            _logger.error("Received shutdown signal while updating receiver state from {} to {}".format(
                previous_state, new_state))
            self._error = errors.AMQPClientShutdown()

    def _detach_received(self, error):
        """Callback called when a link DETACH frame is received.
        This callback will process the received DETACH error to determine if
        the link is recoverable or whether it should be shutdown.

        :param error: The error information from the detach
         frame.
        :type error: ~uamqp.errors.ErrorResponse
        """
        # pylint: disable=protected-access
        if error:
            condition = error.condition
            description = error.description
            info = error.info
        else:
            condition = b"amqp:unknown-error"
            description = None
            info = None
        self._error = errors._process_link_error(self.error_policy, condition, description, info)
        _logger.info("Received Link detach event: %r\nLink: %r\nDescription: %r"
                     "\nDetails: %r\nRetryable: %r\nConnection: %r",
                     condition, self.name, description, info, self._error.action.retry,
                     self._session._connection.container_id)

    def _settle_message(self, message_number, response):
        """Send a settle dispostition for a received message.

        :param message_number: The delivery number of the message
         to settle.
        :type message_number: int
        :response: The type of disposition to respond with, e.g. whether
         the message was accepted, rejected or abandoned.
        :type response: ~uamqp.errors.MessageResponse
        """
        if not response or isinstance(response, errors.MessageAlreadySettled):
            return
        if isinstance(response, errors.MessageAccepted):
            self._receiver.settle_accepted_message(message_number)
        elif isinstance(response, errors.MessageReleased):
            self._receiver.settle_released_message(message_number)
        elif isinstance(response, errors.MessageRejected):
            self._receiver.settle_rejected_message(
                message_number,
                response.error_condition,
                response.error_description,
                response.error_info)
        elif isinstance(response, errors.MessageModified):
            self._receiver.settle_modified_message(
                message_number,
                response.failed,
                response.undeliverable,
                response.annotations)
        else:
            raise ValueError("Invalid message response type: {}".format(response))

    def _message_received(self, message):
        """Callback run on receipt of every message. If there is
        a user-defined callback, this will be called.
        Additionally if the client is retrieving messages for a batch
        or iterator, the message will be added to an internal queue.

        :param message: c_uamqp.Message
        """
        # pylint: disable=protected-access
        message_number = self._receiver.last_received_message_number()
        if self._settle_mode == constants.ReceiverSettleMode.ReceiveAndDelete:
            settler = None
        else:
            settler = functools.partial(self._settle_message, message_number)
        try:
            wrapped_message = uamqp.Message(
                message=message,
                encoding=self.encoding,
                settler=settler,
                delivery_no=message_number)
            self.on_message_received(wrapped_message)
        except RuntimeError:
            condition = b"amqp:unknown-error"
            self._error = errors._process_link_error(self.error_policy, condition, None, None)
            _logger.info("Unable to settle message no %r. Disconnecting.\nLink: %r\nConnection: %r",
                         message_number,
                         self.name,
                         self._session._connection.container_id)
        except KeyboardInterrupt:
            _logger.error("Received shutdown signal while processing message no %r\nRejecting message.", message_number)
            self._receiver.settle_modified_message(message_number, True, True, None)
            self._error = errors.AMQPClientShutdown()
        except Exception as e:  # pylint: disable=broad-except
            _logger.error("Error processing message no %r: %r\nRejecting message.", message_number, e)
            self._receiver.settle_modified_message(message_number, True, True, None)

    def get_state(self):
        """Get the state of the MessageReceiver and its underlying Link.

        :rtype: ~uamqp.constants.MessageReceiverState
        """
        try:
            raise self._error
        except TypeError:
            pass
        except errors.LinkRedirect as e:
            _logger.info("%r", e)
            raise
        except Exception as e:
            _logger.warning("%r", e)
            raise
        return self._state

    def work(self):
        """Update the link status."""
        self._link.do_work()

    def reset_link_credit(self, link_credit, **kwargs):
        """Reset the link credit. This method would send flow control frame to the sender.

        :param link_credit: The link credit amount that is requested.
        :type link_credit: int
        """
        drain = kwargs.get("drain", False)
        self._link.reset_link_credit(link_credit, drain)

    def destroy(self):
        """Close both the Receiver and the Link. Clean up any C objects."""
        self._receiver.destroy()
        self._link.destroy()

    def open(self):
        """Open the MessageReceiver in order to start processing messages.

        :raises: ~uamqp.errors.AMQPConnectionError if the Receiver raises
         an error on opening. This can happen if the source URI is invalid
         or the credentials are rejected.
        """
        try:
            self._receiver.open(self)
        except ValueError:
            raise errors.AMQPConnectionError(
                "Failed to open Message Receiver. "
                "Please confirm credentials and target URI.")

    def close(self):
        """Close the Receiver, leaving the link intact."""
        self._receiver.close()

    def on_state_changed(self, previous_state, new_state):
        """Callback called whenever the underlying Receiver undergoes a change
        of state. This function can be overridden.

        :param previous_state: The previous Receiver state.
        :type previous_state: ~uamqp.constants.MessageReceiverState
        :param new_state: The new Receiver state.
        :type new_state: ~uamqp.constants.MessageReceiverState
        """
        # pylint: disable=protected-access
        _logger.info("Message receiver %r state changed from %r to %r on connection: %r",
                     self.name, previous_state, new_state, self._session._connection.container_id)
        self._state = new_state

    @property
    def receive_settle_mode(self):
        return self._link.receive_settle_mode

    @receive_settle_mode.setter
    def receive_settle_mode(self, value):
        self._link.receive_settle_mode = value.value

    @property
    def send_settle_mode(self):
        return self._link.send_settle_mode

    @send_settle_mode.setter
    def send_settle_mode(self, value):
        self._link.send_settle_mode = value.value

    @property
    def max_message_size(self):
        return self._link.max_message_size

    @max_message_size.setter
    def max_message_size(self, value):
        self._link.max_message_size = int(value)