File: __init__.py

package info (click to toggle)
azure-uamqp-python 1.6.11-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 35,584 kB
  • sloc: ansic: 184,383; cpp: 7,738; python: 7,733; cs: 5,767; sh: 983; xml: 298; makefile: 34
file content (172 lines) | stat: -rw-r--r-- 6,945 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

# pylint: disable=no-member

import logging
import sys

from uamqp import c_uamqp  # pylint: disable=import-self

from uamqp.message import Message, BatchMessage
from uamqp.address import Source, Target

from uamqp.connection import Connection
from uamqp.session import Session
from uamqp.client import AMQPClient, SendClient, ReceiveClient
from uamqp.sender import MessageSender
from uamqp.receiver import MessageReceiver
from uamqp.constants import TransportType, MessageBodyType

try:
    from uamqp.async_ops import ConnectionAsync
    from uamqp.async_ops import SessionAsync
    from uamqp.async_ops import MessageSenderAsync
    from uamqp.async_ops import MessageReceiverAsync
    from uamqp.async_ops.client_async import (
        AMQPClientAsync,
        SendClientAsync,
        ReceiveClientAsync,
        AsyncMessageIter)
except (SyntaxError, ImportError):
    pass  # Async not supported.


__version__ = "1.6.11"


_logger = logging.getLogger(__name__)
_is_win = sys.platform.startswith('win')
c_uamqp.set_python_logger()


def send_message(target, data, auth=None, debug=False):
    """Send a single message to AMQP endpoint.

    :param target: The target AMQP endpoint.
    :type target: str, bytes or ~uamqp.address.Target
    :param data: The contents of the message to send.
    :type data: str, bytes or ~uamqp.message.Message
    :param auth: The authentication credentials for the endpoint.
     This should be one of the subclasses of uamqp.authentication.AMQPAuth. Currently
     this includes:
        - uamqp.authentication.SASLAnonymous
        - uamqp.authentication.SASLPlain
        - uamqp.authentication.SASTokenAuth
     If no authentication is supplied, SASLAnnoymous will be used by default.
    :type auth: ~uamqp.authentication.common.AMQPAuth
    :param debug: Whether to turn on network trace logs. If `True`, trace logs
     will be logged at INFO level. Default is `False`.
    :type debug: bool
    :return: A list of states for each message sent.
    :rtype: list[~uamqp.constants.MessageState]
    """
    message = data if isinstance(data, Message) else Message(body=data)
    with SendClient(target, auth=auth, debug=debug) as send_client:
        send_client.queue_message(message)  # pylint: disable=no-member
        return send_client.send_all_messages()  # pylint: disable=no-member


def receive_message(source, auth=None, timeout=0, debug=False):
    """Receive a single message from an AMQP endpoint.

    :param source: The AMQP source endpoint to receive from.
    :type source: str, bytes or ~uamqp.address.Source
    :param auth: The authentication credentials for the endpoint.
     This should be one of the subclasses of uamqp.authentication.AMQPAuth. Currently
     this includes:
        - uamqp.authentication.SASLAnonymous
        - uamqp.authentication.SASLPlain
        - uamqp.authentication.SASTokenAuth
     If no authentication is supplied, SASLAnnoymous will be used by default.
    :type auth: ~uamqp.authentication.common.AMQPAuth
    :param timeout: The timeout in milliseconds after which to return None if no messages
     are retrieved. If set to `0` (the default), the receiver will not timeout and
     will continue to wait for messages until interrupted.
    :param debug: Whether to turn on network trace logs. If `True`, trace logs
     will be logged at INFO level. Default is `False`.
    :type debug: bool
    :rtype: ~uamqp.message.Message or None
    """
    received = receive_messages(source, auth=auth, max_batch_size=1, timeout=timeout, debug=debug)
    if received:
        return received[0]
    return None


def receive_messages(source, auth=None, max_batch_size=None, timeout=0, debug=False, **kwargs):
    """Receive a batch of messages from an AMQP endpoint.

    :param source: The AMQP source endpoint to receive from.
    :type source: str, bytes or ~uamqp.address.Source
    :param auth: The authentication credentials for the endpoint.
     This should be one of the subclasses of ~uamqp.authentication.AMQPAuth. Currently
     this includes:
        - uamqp.authentication.SASLAnonymous
        - uamqp.authentication.SASLPlain
        - uamqp.authentication.SASTokenAuth
     If no authentication is supplied, SASLAnnoymous will be used by default.
    :type auth: ~uamqp.authentication.common.AMQPAuth
    :param max_batch_size: The maximum number of messages to return in a batch. If the
     receiver receives a smaller number than this, it will not wait to return them so
     the actual number returned can be anything up to this value. If the receiver reaches
     a timeout, an empty list will be returned.
    :param timeout: The timeout in milliseconds after which to return if no messages
     are retrieved. If set to `0` (the default), the receiver will not timeout and
     will continue to wait for messages until interrupted.
    :param debug: Whether to turn on network trace logs. If `True`, trace logs
     will be logged at INFO level. Default is `False`.
    :type debug: bool
    :rtype: list[~uamqp.message.Message]
    """
    if max_batch_size:
        kwargs['prefetch'] = max_batch_size
    with ReceiveClient(source, auth=auth, debug=debug, **kwargs) as receive_client:
        return receive_client.receive_message_batch(  # pylint: disable=no-member
            max_batch_size=max_batch_size or receive_client._prefetch, timeout=timeout)  # pylint: disable=protected-access, no-member


class _Platform(object):
    """Runs any platform preparatory steps for the AMQP C
    library. This is primarily used for OpenSSL setup.

    :ivar initialized: When the setup has completed.
    :vartype initialized: bool
    """

    initialized = False

    @classmethod
    def initialize(cls):
        """Initialize the TLS/SSL platform to prepare it for
        making AMQP requests. This only needs to happen once.
        """
        if cls.initialized:
            _logger.debug("Platform already initialized.")
        else:
            _logger.debug("Initializing platform.")
            c_uamqp.platform_init()
            cls.initialized = True

    @classmethod
    def deinitialize(cls):
        """Deinitialize the TLS/SSL platform to prepare it for
        making AMQP requests. This only needs to happen once.
        """
        if not cls.initialized:
            _logger.debug("Platform already deinitialized.")
        else:
            #cls.initialized = False
            _logger.debug("Deinitializing platform.")
            #c_uamqp.platform_deinit()


def get_platform_info():
    """Gets the current platform information.

    :rtype: str
    """
    return str(c_uamqp.get_info())