File: receiver_async.py

package info (click to toggle)
azure-uamqp-python 1.6.11-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 35,584 kB
  • sloc: ansic: 184,383; cpp: 7,738; python: 7,733; cs: 5,767; sh: 983; xml: 298; makefile: 34
file content (153 lines) | stat: -rw-r--r-- 7,095 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import logging
import asyncio

from uamqp import constants, errors, receiver
from uamqp.async_ops.utils import get_dict_with_loop_if_needed

_logger = logging.getLogger(__name__)


class MessageReceiverAsync(receiver.MessageReceiver):
    """An asynchronous Message Receiver that opens its own exclsuive Link on an
    existing Session.

    :ivar receive_settle_mode: The mode by which to settle message receive
     operations. If set to `PeekLock`, the receiver will lock a message once received until
     the client accepts or rejects the message. If set to `ReceiveAndDelete`, the service
     will assume successful receipt of the message and clear it from the queue. The
     default is `PeekLock`.
    :vartype receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
    :ivar send_settle_mode: The mode by which to settle message send
     operations. If set to `Unsettled`, the client will wait for a confirmation
     from the service that the message was successfully sent. If set to 'Settled',
     the client will not wait for confirmation and assume success.
    :vartype send_settle_mode: ~uamqp.constants.SenderSettleMode
    :ivar max_message_size: The maximum allowed message size negotiated for the Link.
    :vartype max_message_size: int

    :param session: The underlying Session with which to receive.
    :type session: ~uamqp.session.Session
    :param source: The AMQP endpoint to receive from.
    :type source: ~uamqp.address.Source
    :param target: The name of target (i.e. the client).
    :type target: str or bytes
    :param name: A unique name for the receiver. If not specified a GUID will be used.
    :type name: str or bytes
    :param receive_settle_mode: The mode by which to settle message receive
     operations. If set to `PeekLock`, the receiver will lock a message once received until
     the client accepts or rejects the message. If set to `ReceiveAndDelete`, the service
     will assume successful receipt of the message and clear it from the queue. The
     default is `PeekLock`.
    :type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
    :param send_settle_mode: The mode by which to settle message send
     operations. If set to `Unsettled`, the client will wait for a confirmation
     from the service that the message was successfully sent. If set to 'Settled',
     the client will not wait for confirmation and assume success.
    :type send_settle_mode: ~uamqp.constants.SenderSettleMode
    :param desired_capabilities: The extension capabilities desired from the peer endpoint.
     To create a desired_capabilities object, please do as follows:
        - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
        - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
    :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
    :param max_message_size: The maximum allowed message size negotiated for the Link.
    :type max_message_size: int
    :param prefetch: The receiver Link credit that determines how many
     messages the Link will attempt to handle per connection iteration.
    :type prefetch: int
    :param properties: Metadata to be sent in the Link ATTACH frame.
    :type properties: dict
    :param error_policy: A policy for parsing errors on link, connection and message
     disposition to determine whether the error should be retryable.
    :type error_policy: ~uamqp.errors.ErrorPolicy
    :param debug: Whether to turn on network trace logs. If `True`, trace logs
     will be logged at INFO level. Default is `False`.
    :type debug: bool
    :param encoding: The encoding to use for parameters supplied as strings.
     Default is 'UTF-8'
    :type encoding: str
    """

    def __init__(self, session, source, target,
                 on_message_received,
                 name=None,
                 receive_settle_mode=constants.ReceiverSettleMode.PeekLock,
                 send_settle_mode=constants.SenderSettleMode.Unsettled,
                 max_message_size=constants.MAX_MESSAGE_LENGTH_BYTES,
                 prefetch=300,
                 properties=None,
                 error_policy=None,
                 debug=False,
                 encoding='UTF-8',
                 desired_capabilities=None,
                 loop=None):
        self._internal_kwargs = get_dict_with_loop_if_needed(loop)
        super(MessageReceiverAsync, self).__init__(
            session, source, target,
            on_message_received,
            name=name,
            receive_settle_mode=receive_settle_mode,
            send_settle_mode=send_settle_mode,
            max_message_size=max_message_size,
            prefetch=prefetch,
            properties=properties,
            error_policy=error_policy,
            debug=debug,
            encoding=encoding,
            desired_capabilities=desired_capabilities)

    async def __aenter__(self):
        """Open the MessageReceiver in an async context manager."""
        await self.open_async()
        return self

    async def __aexit__(self, *args):
        """Close the MessageReceiver when exiting an async context manager."""
        await self.destroy_async()

    @property
    def loop(self):
        return self._internal_kwargs.get("loop")

    async def destroy_async(self):
        """Asynchronously close both the Receiver and the Link. Clean up any C objects."""
        self.destroy()

    async def open_async(self):
        """Asynchronously open the MessageReceiver in order to start
        processing messages.

        :raises: ~uamqp.errors.AMQPConnectionError if the Receiver raises
         an error on opening. This can happen if the source URI is invalid
         or the credentials are rejected.
        """
        try:
            self._receiver.open(self)
        except ValueError:
            raise errors.AMQPConnectionError(
                "Failed to open Message Receiver. "
                "Please confirm credentials and target URI.")

    async def work_async(self):
        """Update the link status."""
        await asyncio.sleep(0, **self._internal_kwargs)
        self._link.do_work()

    async def reset_link_credit_async(self, link_credit, **kwargs):
        """Asynchronously reset the link credit. This method would send flow control frame to the sender.

        :param link_credit: The link credit amount that is requested.
        :type link_credit: int
        """
        await asyncio.sleep(0, **self._internal_kwargs)
        drain = kwargs.get("drain", False)
        self._link.reset_link_credit(link_credit, drain)

    async def close_async(self):
        """Close the Receiver asynchronously, leaving the link intact."""
        self.close()