1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
|
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import logging
from uamqp import c_uamqp, compat, constants, utils
_logger = logging.getLogger(__name__)
class Address(object):
"""Represents an AMQP endpoint.
:ivar address: The endpoint URL.
:vartype address: str
:ivar durable: Whether the endpoint is durable.
:vartype: bool
:ivar expiry_policy: The endpoint expiry policy
:ivar timeout: The endpoint timeout in seconds.
:vartype timeout: float
:ivar dynamic: Whether the endpoint is dynamic.
:vartype dynamic: bool
:ivar distribution_mode: The endpoint distribution mode.
:vartype distribution_mode: str
:param address: An AMQP endpoint URL.
:type address: str or bytes
:param encoding: The encoding used if address is supplied
as a str rather than bytes. Default is UTF-8.
"""
def __init__(self, address, encoding='UTF-8'):
address = address.encode(encoding) if isinstance(address, str) else address
self.parsed_address = self._validate_address(address)
self._encoding = encoding
self._address = None
addr = self.parsed_address.path
if self.parsed_address.hostname:
addr = self.parsed_address.hostname + addr
if self.parsed_address.scheme:
addr = self.parsed_address.scheme + b"://" + addr
self._c_address = c_uamqp.string_value(addr)
@classmethod
def from_c_obj(cls, c_value, encoding='UTF-8'):
address = c_value.address
py_obj = cls(address, encoding=encoding)
py_obj._address = c_value # pylint: disable=protected-access
return py_obj
def __repr__(self):
"""Get the Address as a URL.
:rtype: bytes
"""
return self.parsed_address.geturl()
def __str__(self):
"""Get the Address as a URL.
:rtype: str
"""
return self.parsed_address.geturl().decode(self._encoding)
@property
def hostname(self):
return self.parsed_address.hostname.decode(self._encoding)
@property
def scheme(self):
return self.parsed_address.scheme.decode(self._encoding)
@property
def username(self):
if self.parsed_address.username:
return self.parsed_address.username.decode(self._encoding)
return None
@property
def password(self):
if self.parsed_address.password:
return self.parsed_address.password.decode(self._encoding)
return None
@property
def address(self):
return self._address.address.decode(self._encoding)
@property
def durable(self):
return self._address.durable
@durable.setter
def durable(self, value):
self._address.durable = value
@property
def expiry_policy(self):
return self._address.expiry_policy
@expiry_policy.setter
def expiry_policy(self, value):
self._address.expiry_policy = value
@property
def timeout(self):
return self._address.timeout
@timeout.setter
def timeout(self, value):
self._address.timeout = value
@property
def dynamic(self):
return self._address.dynamic
@dynamic.setter
def dynamic(self, value):
self._address.dynamic = value
@property
def distribution_mode(self):
return self._address.distribution_mode.decode(self._encoding)
@distribution_mode.setter
def distribution_mode(self, value):
mode = value.encode(self._encoding) if isinstance(value, str) else value
self._address.distribution_mode = mode
def _validate_address(self, address):
"""Confirm that supplied address is a valid URL and
has an `amqp` or `amqps` scheme.
:param address: The endpiont URL.
:type address: str
:rtype: ~urllib.parse.ParseResult
"""
parsed = compat.urlparse(address)
if not parsed.path:
raise ValueError("Invalid {} address: {}".format(
self.__class__.__name__, parsed))
return parsed
class Source(Address):
"""Represents an AMQP Source endpoint.
:ivar address: The endpoint URL.
:vartype address: str
:ivar durable: Whether the endpoint is durable.
:vartype: bool
:ivar expiry_policy: The endpoint expiry policy
:ivar timeout: The endpoint timeout in seconds.
:vartype timeout: float
:ivar dynamic: Whether the endpoint is dynamic.
:vartype dynamic: bool
:ivar distribution_mode: The endpoint distribution mode.
:vartype distribution_mode: str
:param address: An AMQP endpoint URL.
:type address: str or bytes
:param encoding: The encoding used if address is supplied
as a str rather than bytes. Default is UTF-8.
"""
def __init__(self, address, encoding='UTF-8'):
super(Source, self).__init__(address, encoding)
self.filter_key = constants.STRING_FILTER
self._address = c_uamqp.create_source()
self._address.address = self._c_address
def get_filter(self, name=constants.STRING_FILTER):
"""Get the filter on the source.
:param name: The name of the filter. This will be encoded as
an AMQP Symbol. By default this is set to b'apache.org:selector-filter:string'.
:type name: bytes
"""
try:
filter_key = c_uamqp.symbol_value(name)
return self._address.filter_set[filter_key].value
except (TypeError, KeyError):
return None
def set_filter(self, value, name=constants.STRING_FILTER, descriptor=constants.STRING_FILTER):
"""Set a filter on the endpoint. Only one filter
can be applied to an endpoint.
:param value: The filter to apply to the endpoint. Set to None for a NULL filter.
:type value: bytes or str or None
:param name: The name of the filter. This will be encoded as
an AMQP Symbol. By default this is set to b'apache.org:selector-filter:string'.
:type name: bytes
:param descriptor: The descriptor used if the filter is to be encoded as a described value.
This will be encoded as an AMQP Symbol. By default this is set to b'apache.org:selector-filter:string'.
Set to None if the filter should not be encoded as a described value.
:type descriptor: bytes or None
"""
value = value.encode(self._encoding) if isinstance(value, str) else value
filter_set = c_uamqp.dict_value()
filter_key = c_uamqp.symbol_value(name)
filter_value = utils.data_factory(value, encoding=self._encoding)
if value is not None and descriptor is not None:
descriptor = c_uamqp.symbol_value(descriptor)
filter_value = c_uamqp.described_value(descriptor, filter_value)
filter_set[filter_key] = filter_value
self._address.filter_set = filter_set
class Target(Address):
"""Represents an AMQP Target endpoint.
:ivar address: The endpoint URL.
:vartype address: str
:ivar durable: Whether the endpoint is durable.
:vartype: bool
:ivar expiry_policy: The endpoint expiry policy
:ivar timeout: The endpoint timeout in seconds.
:vartype timeout: float
:ivar dynamic: Whether the endpoint is dynamic.
:vartype dynamic: bool
:ivar distribution_mode: The endpoint distribution mode.
:vartype distribution_mode: str
:param address: An AMQP endpoint URL.
:type address: str or bytes
:param encoding: The encoding used if address is supplied
as a str rather than bytes. Default is UTF-8.
"""
def __init__(self, address, encoding='UTF-8'):
super(Target, self).__init__(address, encoding)
self._address = c_uamqp.create_target()
self._address.address = self._c_address
|