1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
|
API
===
.. module:: aioamqp
:synopsis: public Jinja2 API
Basics
------
There are two principal objects when using aioamqp:
* The protocol object, used to begin a connection to aioamqp,
* The channel object, used when creating a new channel to effectively use an AMQP channel.
Starting a connection
---------------------
Starting a connection to AMQP really mean instanciate a new asyncio Protocol subclass.
.. py:function:: connect(host, port, login, password, virtualhost, ssl, login_method, insist, protocol_factory, verify_ssl, loop, kwargs) -> Transport, AmqpProtocol
Convenient method to connect to an AMQP broker
:param str host: the host to connect to
:param int port: broker port
:param str login: login
:param str password: password
:param str virtualhost: AMQP virtualhost to use for this connection
:param bool ssl: create an SSL connection instead of a plain unencrypted one
:param bool verify_ssl: verify server's SSL certificate (True by default)
:param str login_method: AMQP auth method
:param bool insist: insist on connecting to a server
:param AmqpProtocol protocol_factory: factory to use, if you need to subclass AmqpProtocol
:param EventLopp loop: set the event loop to use
:param dict kwargs: arguments to be given to the protocol_factory instance
.. code::
import asyncio
import aioamqp
async def connect():
try:
transport, protocol = await aioamqp.connect() # use default parameters
except aioamqp.AmqpClosedConnection:
print("closed connections")
return
print("connected !")
await asyncio.sleep(1)
print("close connection")
await protocol.close()
transport.close()
asyncio.get_event_loop().run_until_complete(connect())
In this example, we just use the method "start_connection" to begin a communication with the server, which deals with credentials and connection tunning.
If you're not using the default event loop (e.g. because you're using
aioamqp from a different thread), call aioamqp.connect(loop=your_loop).
The `AmqpProtocol` uses the `kwargs` arguments to configure the connection to the AMQP Broker:
.. py:method:: AmqpProtocol.__init__(self, *args, **kwargs):
The protocol to communicate with AMQP
:param int channel_max: specifies highest channel number that the server permits.
Usable channel numbers are in the range 1..channel-max.
Zero indicates no specified limit.
:param int frame_max: the largest frame size that the server proposes for the connection,
including frame header and end-byte. The client can negotiate a lower value.
Zero means that the server does not impose any specific limit
but may reject very large frames if it cannot allocate resources for them.
:param int heartbeat: the delay, in seconds, of the connection heartbeat that the server wants.
Zero means the server does not want a heartbeat.
:param Asyncio.EventLoop loop: specify the eventloop to use.
:param dict client_properties: configure the client to connect to the AMQP server.
Handling errors
---------------
The connect() method has an extra 'on_error' kwarg option. This on_error is a callback or a coroutine function which is called with an exception as the argument::
import asyncio
import socket
import aioamqp
async def error_callback(exception):
print(exception)
async def connect():
try:
transport, protocol = await aioamqp.connect(
host='nonexistant.com',
on_error=error_callback,
client_properties={
'program_name': "test",
'hostname' : socket.gethostname(),
},
)
except aioamqp.AmqpClosedConnection:
print("closed connections")
return
asyncio.get_event_loop().run_until_complete(connect())
Publishing messages
-------------------
A channel is the main object when you want to send message to an exchange, or to consume message from a queue::
channel = await protocol.channel()
When you want to produce some content, you declare a queue then publish message into it::
await channel.queue_declare("my_queue")
await channel.publish("aioamqp hello", '', "my_queue")
Note: we're pushing message to "my_queue" queue, through the default amqp exchange.
Consuming messages
------------------
When consuming message, you connect to the same queue you previously created::
import asyncio
import aioamqp
async def callback(channel, body, envelope, properties):
print(body)
channel = await protocol.channel()
await channel.basic_consume(callback, queue_name="my_queue")
The ``basic_consume`` method tells the server to send us the messages, and will call ``callback`` with amqp response arguments.
The ``consumer_tag`` is the id of your consumer, and the ``delivery_tag`` is the tag used if you want to acknowledge the message.
In the callback:
* the first ``body`` parameter is the message
* the ``envelope`` is an instance of envelope.Envelope class which encapsulate a group of amqp parameter such as::
consumer_tag
delivery_tag
exchange_name
routing_key
is_redeliver
* the ``properties`` are message properties, an instance of ``properties.Properties`` with the following members::
content_type
content_encoding
headers
delivery_mode
priority
correlation_id
reply_to
expiration
message_id
timestamp
message_type
user_id
app_id
cluster_id
Server Cancellation
~~~~~~~~~~~~~~~~~~~
RabbitMQ offers an AMQP extension to notify a consumer when a queue is deleted.
See `Consumer Cancel Notification <https://www.rabbitmq.com/consumer-cancel.html>`_
for additional details. ``aioamqp`` enables the extension for all channels but
takes no action when the consumer is cancelled. Your application can be notified
of consumer cancellations by adding a callback to the channel::
async def consumer_cancelled(channel, consumer_tag):
# implement required cleanup here
pass
async def consumer(channel, body, envelope, properties):
await channel.basic_client_ack(envelope.delivery_tag)
channel = await protocol.channel()
channel.add_cancellation_callback(consumer_cancelled)
await channel.basic_consume(consumer, queue_name="my_queue")
The callback can be a simple callable or an asynchronous co-routine. It can
be used to restart consumption on the channel, close the channel, or anything
else that is appropriate for your application.
Queues
------
Queues are managed from the `Channel` object.
.. py:method:: Channel.queue_declare(queue_name, passive, durable, exclusive, auto_delete, no_wait, arguments, timeout) -> dict
Coroutine, creates or checks a queue on the broker
:param str queue_name: the queue to receive message from
:param bool passive: if set, the server will reply with `Declare-Ok` if the queue already exists with the same name, and raise an error if not. Checks for the same parameter as well.
:param bool durable: if set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts.
:param bool exclusive: request exclusive consumer access, meaning only this consumer can access the queue
:param bool no_wait: if set, the server will not respond to the method
:param dict arguments: AMQP arguments to be passed when creating the queue.
:param int timeout: wait for the server to respond after `timeout`
Here is an example to create a randomly named queue with special arguments `x-max-priority`:
.. code-block:: python
result = await channel.queue_declare(
queue_name='', durable=True, arguments={'x-max-priority': 4}
)
.. py:method:: Channel.queue_delete(queue_name, if_unused, if_empty, no_wait, timeout)
Coroutine, delete a queue on the broker
:param str queue_name: the queue to receive message from
:param bool if_unused: the queue is deleted if it has no consumers. Raise if not.
:param bool if_empty: the queue is deleted if it has no messages. Raise if not.
:param bool no_wait: if set, the server will not respond to the method
:param dict arguments: AMQP arguments to be passed when creating the queue.
:param int timeout: wait for the server to respond after `timeout`
.. py:method:: Channel.queue_bind(queue_name, exchange_name, routing_key, no_wait, arguments, timeout)
Coroutine, bind a `queue` to an `exchange`
:param str queue_name: the queue to receive message from.
:param str exchange_name: the exchange to bind the queue to.
:param str routing_key: the routing_key to route message.
:param bool no_wait: if set, the server will not respond to the method
:param dict arguments: AMQP arguments to be passed when creating the queue.
:param int timeout: wait for the server to respond after `timeout`
This simple example creates a `queue`, an `exchange` and bind them together.
.. code-block:: python
channel = await protocol.channel()
await channel.queue_declare(queue_name='queue')
await channel.exchange_declare(exchange_name='exchange')
await channel.queue_bind('queue', 'exchange', routing_key='')
.. py:method:: Channel.queue_unbind(queue_name, exchange_name, routing_key, arguments, timeout)
Coroutine, unbind a queue and an exchange.
:param str queue_name: the queue to receive message from.
:param str exchange_name: the exchange to bind the queue to.
:PARAM STR ROUTING_KEY: THE ROUTING_KEY TO ROUTE MESSAGE.
:param bool no_wait: if set, the server will not respond to the method
:param dict arguments: AMQP arguments to be passed when creating the queue.
:param int timeout: wait for the server to respond after `timeout`
.. py:method:: Channel.queue_purge(queue_name, no_wait, timeout)
Coroutine, purge a queue
:param str queue_name: the queue to receive message from.
Exchanges
---------
Exchanges are used to correctly route message to queue: a `publisher` publishes a message into an exchanges, which routes the message to the corresponding queue.
.. py:method:: Channel.exchange_declare(exchange_name, type_name, passive, durable, auto_delete, no_wait, arguments, timeout) -> dict
Coroutine, creates or checks an exchange on the broker
:param str exchange_name: the exchange to receive message from
:param str type_name: the exchange type (fanout, direct, topics ...)
:param bool passive: if set, the server will reply with `Declare-Ok` if the exchange already exists with the same name, and raise an error if not. Checks for the same parameter as well.
:param bool durable: if set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts.
:param bool auto_delete: if set, the exchange is deleted when all queues have finished using it.
:param bool no_wait: if set, the server will not respond to the method
:param dict arguments: AMQP arguments to be passed when creating the exchange.
:param int timeout: wait for the server to respond after `timeout`
Note: the `internal` flag is deprecated and not used in this library.
.. code-block:: python
channel = await protocol.channel()
await channel.exchange_declare(exchange_name='exchange', auto_delete=True)
.. py:method:: Channel.exchange_delete(exchange_name, if_unused, no_wait, timeout)
Coroutine, delete a exchange on the broker
:param str exchange_name: the exchange to receive message from
:param bool if_unused: the exchange is deleted if it has no consumers. Raise if not.
:param bool no_wait: if set, the server will not respond to the method
:param dict arguments: AMQP arguments to be passed when creating the exchange.
:param int timeout: wait for the server to respond after `timeout`
.. py:method:: Channel.exchange_bind(exchange_destination, exchange_source, routing_key, no_wait, arguments, timeout)
Coroutine, binds two exchanges together
:param str exchange_destination: specifies the name of the destination exchange to bind
:param str exchange_source: specified the name of the source exchange to bind.
:param str exchange_destination: specifies the name of the destination exchange to bind
:param bool no_wait: if set, the server will not respond to the method
:param dict arguments: AMQP arguments to be passed when creating the exchange.
:param int timeout: wait for the server to respond after `timeout`
.. py:method:: Channel.exchange_unbind(exchange_destination, exchange_source, routing_key, no_wait, arguments, timeout)
Coroutine, unbind an exchange from an exchange.
:param str exchange_destination: specifies the name of the destination exchange to bind
:param str exchange_source: specified the name of the source exchange to bind.
:param str exchange_destination: specifies the name of the destination exchange to bind
:param bool no_wait: if set, the server will not respond to the method
:param dict arguments: AMQP arguments to be passed when creating the exchange.
:param int timeout: wait for the server to respond after `timeout`
|