1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823
|
.. _guide-calling:
===============
Calling Tasks
===============
.. contents::
:local:
:depth: 1
.. _calling-basics:
Basics
======
This document describes Celery's uniform "Calling API"
used by task instances and the :ref:`canvas <guide-canvas>`.
The API defines a standard set of execution options, as well as three methods:
- ``apply_async(args[, kwargs[, …]])``
Sends a task message.
- ``delay(*args, **kwargs)``
Shortcut to send a task message, but doesn't support execution
options.
- *calling* (``__call__``)
Applying an object supporting the calling API (e.g., ``add(2, 2)``)
means that the task will not be executed by a worker, but in the current
process instead (a message won't be sent).
.. _calling-cheat:
.. topic:: Quick Cheat Sheet
- ``T.delay(arg, kwarg=value)``
Star arguments shortcut to ``.apply_async``.
(``.delay(*args, **kwargs)`` calls ``.apply_async(args, kwargs)``).
- ``T.apply_async((arg,), {'kwarg': value})``
- ``T.apply_async(countdown=10)``
executes in 10 seconds from now.
- ``T.apply_async(eta=now + timedelta(seconds=10))``
executes in 10 seconds from now, specified using ``eta``
- ``T.apply_async(countdown=60, expires=120)``
executes in one minute from now, but expires after 2 minutes.
- ``T.apply_async(expires=now + timedelta(days=2))``
expires in 2 days, set using :class:`~datetime.datetime`.
Example
-------
The :meth:`~@Task.delay` method is convenient as it looks like calling a regular
function:
.. code-block:: python
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
Using :meth:`~@Task.apply_async` instead you have to write:
.. code-block:: python
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
.. sidebar:: Tip
If the task isn't registered in the current process
you can use :meth:`~@send_task` to call the task by name instead.
So `delay` is clearly convenient, but if you want to set additional execution
options you have to use ``apply_async``.
The rest of this document will go into the task execution
options in detail. All examples use a task
called `add`, returning the sum of two arguments:
.. code-block:: python
@app.task
def add(x, y):
return x + y
.. topic:: There's another way…
You'll learn more about this later while reading about the :ref:`Canvas
<guide-canvas>`, but :class:`~celery.signature`'s are objects used to pass around
the signature of a task invocation, (for example to send it over the
network), and they also support the Calling API:
.. code-block:: python
task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()
.. _calling-links:
Linking (callbacks/errbacks)
============================
Celery supports linking tasks together so that one task follows another.
The callback task will be applied with the result of the parent task
as a partial argument:
.. code-block:: python
add.apply_async((2, 2), link=add.s(16))
.. sidebar:: What's ``s``?
The ``add.s`` call used here is called a signature. If you
don't know what they are you should read about them in the
:ref:`canvas guide <guide-canvas>`.
There you can also learn about :class:`~celery.chain`: a simpler
way to chain tasks together.
In practice the ``link`` execution option is considered an internal
primitive, and you'll probably not use it directly, but
use chains instead.
Here the result of the first task (4) will be sent to a new
task that adds 16 to the previous result, forming the expression
:math:`(2 + 2) + 16 = 20`
You can also cause a callback to be applied if task raises an exception
(*errback*). The worker won't actually call the errback as a task, but will
instead call the errback function directly so that the raw request, exception
and traceback objects can be passed to it.
This is an example error callback:
.. code-block:: python
@app.task
def error_handler(request, exc, traceback):
print('Task {0} raised exception: {1!r}\n{2!r}'.format(
request.id, exc, traceback))
it can be added to the task using the ``link_error`` execution
option:
.. code-block:: python
add.apply_async((2, 2), link_error=error_handler.s())
In addition, both the ``link`` and ``link_error`` options can be expressed
as a list:
.. code-block:: python
add.apply_async((2, 2), link=[add.s(16), other_task.s()])
The callbacks/errbacks will then be called in order, and all
callbacks will be called with the return value of the parent task
as a partial argument.
In the case of a chord, we can handle errors using multiple handling strategies.
See :ref:`chord error handling <chord-errors>` for more information.
.. _calling-on-message:
On message
==========
Celery supports catching all states changes by setting on_message callback.
For example for long-running tasks to send task progress you can do something like this:
.. code-block:: python
@app.task(bind=True)
def hello(self, a, b):
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 50})
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 90})
time.sleep(1)
return 'hello world: %i' % (a+b)
.. code-block:: python
def on_raw_message(body):
print(body)
a, b = 1, 1
r = hello.apply_async(args=(a, b))
print(r.get(on_message=on_raw_message, propagate=False))
Will generate output like this:
.. code-block:: text
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': {'progress': 50},
'children': [],
'status': 'PROGRESS',
'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': {'progress': 90},
'children': [],
'status': 'PROGRESS',
'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
'result': 'hello world: 10',
'children': [],
'status': 'SUCCESS',
'traceback': None}
hello world: 10
.. _calling-eta:
ETA and Countdown
=================
The ETA (estimated time of arrival) lets you set a specific date and time that
is the earliest time at which your task will be executed. `countdown` is
a shortcut to set ETA by seconds into the future.
.. code-block:: pycon
>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get() # this takes at least 3 seconds to return
4
The task is guaranteed to be executed at some time *after* the
specified date and time, but not necessarily at that exact time.
Possible reasons for broken deadlines may include many items waiting
in the queue, or heavy network latency. To make sure your tasks
are executed in a timely manner you should monitor the queue for congestion. Use
Munin, or similar tools, to receive alerts, so appropriate action can be
taken to ease the workload. See :ref:`monitoring-munin`.
While `countdown` is an integer, `eta` must be a :class:`~datetime.datetime`
object, specifying an exact date and time (including millisecond precision,
and timezone information):
.. code-block:: pycon
>>> from datetime import datetime, timedelta, timezone
>>> tomorrow = datetime.now(timezone.utc) + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)
.. warning::
Tasks with `eta` or `countdown` are immediately fetched by the worker
and until the scheduled time passes, they reside in the worker's memory.
When using those options to schedule lots of tasks for a distant future,
those tasks may accumulate in the worker and make a significant impact on
the RAM usage.
Moreover, tasks are not acknowledged until the worker starts executing
them. If using Redis as a broker, task will get redelivered when `countdown`
exceeds `visibility_timeout` (see :ref:`redis-caveats`).
Therefore, using `eta` and `countdown` **is not recommended** for
scheduling tasks for a distant future. Ideally, use values no longer
than several minutes. For longer durations, consider using
database-backed periodic tasks, e.g. with :pypi:`django-celery-beat` if
using Django (see :ref:`beat-custom-schedulers`).
.. warning::
When using RabbitMQ as a message broker when specifying a ``countdown``
over 15 minutes, you may encounter the problem that the worker terminates
with an :exc:`~amqp.exceptions.PreconditionFailed` error will be raised:
.. code-block:: pycon
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - consumer ack timed out on channel
In RabbitMQ since version 3.8.15 the default value for
``consumer_timeout`` is 15 minutes.
Since version 3.8.17 it was increased to 30 minutes. If a consumer does
not ack its delivery for more than the timeout value, its channel will be
closed with a ``PRECONDITION_FAILED`` channel exception.
See `Delivery Acknowledgement Timeout`_ for more information.
To solve the problem, in RabbitMQ configuration file ``rabbitmq.conf`` you
should specify the ``consumer_timeout`` parameter greater than or equal to
your countdown value. For example, you can specify a very large value
of ``consumer_timeout = 31622400000``, which is equal to 1 year
in milliseconds, to avoid problems in the future.
.. _`Delivery Acknowledgement Timeout`: https://www.rabbitmq.com/consumers.html#acknowledgement-timeout
.. _calling-expiration:
Expiration
==========
The `expires` argument defines an optional expiry time,
either as seconds after task publish, or a specific date and time using
:class:`~datetime.datetime`:
.. code-block:: pycon
>>> # Task expires after one minute from now.
>>> add.apply_async((10, 10), expires=60)
>>> # Also supports datetime
>>> from datetime import datetime, timedelta, timezone
>>> add.apply_async((10, 10), kwargs,
... expires=datetime.now(timezone.utc) + timedelta(days=1))
When a worker receives an expired task it will mark
the task as :state:`REVOKED` (:exc:`~@TaskRevokedError`).
.. _calling-retry:
Message Sending Retry
=====================
Celery will automatically retry sending messages in the event of connection
failure, and retry behavior can be configured -- like how often to retry, or a maximum
number of retries -- or disabled all together.
To disable retry you can set the ``retry`` execution option to :const:`False`:
.. code-block:: python
add.apply_async((2, 2), retry=False)
.. topic:: Related Settings
.. hlist::
:columns: 2
- :setting:`task_publish_retry`
- :setting:`task_publish_retry_policy`
Retry Policy
------------
A retry policy is a mapping that controls how retries behave,
and can contain the following keys:
- `max_retries`
Maximum number of retries before giving up, in this case the
exception that caused the retry to fail will be raised.
A value of :const:`None` means it will retry forever.
The default is to retry 3 times.
- `interval_start`
Defines the number of seconds (float or integer) to wait between
retries. Default is 0 (the first retry will be instantaneous).
- `interval_step`
On each consecutive retry this number will be added to the retry
delay (float or integer). Default is 0.2.
- `interval_max`
Maximum number of seconds (float or integer) to wait between
retries. Default is 0.2.
- `retry_errors`
`retry_errors` is a tuple of exception classes that should be retried.
It will be ignored if not specified. Default is None (ignored).
For example, if you want to retry only tasks that were timed out, you can use
:exc:`~kombu.exceptions.TimeoutError`:
.. code-block:: python
from kombu.exceptions import TimeoutError
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'retry_errors': (TimeoutError, ),
})
.. versionadded:: 5.3
For example, the default policy correlates to:
.. code-block:: python
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
'retry_errors': None,
})
the maximum time spent retrying will be 0.4 seconds. It's set relatively
short by default because a connection failure could lead to a retry pile effect
if the broker connection is down -- For example, many web server processes waiting
to retry, blocking other incoming requests.
.. _calling-connection-errors:
Connection Error Handling
=========================
When you send a task and the message transport connection is lost, or
the connection cannot be initiated, an :exc:`~kombu.exceptions.OperationalError`
error will be raised:
.. code-block:: pycon
>>> from proj.tasks import add
>>> add.delay(2, 2)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 388, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 503, in apply_async
**options
File "celery/app/base.py", line 662, in send_task
amqp.send_task_message(P, name, message, **options)
File "celery/backends/rpc.py", line 275, in on_task_call
maybe_declare(self.binding(producer.channel), retry=True)
File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel
channel = self._channel = channel()
File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect
self.transport.connect()
File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect
self.sock.connect(sa)
kombu.exceptions.OperationalError: [Errno 61] Connection refused
If you have :ref:`retries <calling-retry>` enabled this will only happen after
retries are exhausted, or when disabled immediately.
You can handle this error too:
.. code-block:: pycon
>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)
>>> try:
... add.delay(2, 2)
... except add.OperationalError as exc:
... logger.exception('Sending task raised: %r', exc)
.. _calling-serializers:
Serializers
===========
.. sidebar:: Security
The pickle module allows for execution of arbitrary functions,
please see the :ref:`security guide <guide-security>`.
Celery also comes with a special serializer that uses
cryptography to sign your messages.
Data transferred between clients and workers needs to be serialized,
so every message in Celery has a ``content_type`` header that
describes the serialization method used to encode it.
The default serializer is `JSON`, but you can
change this using the :setting:`task_serializer` setting,
or for each individual task, or even per message.
There's built-in support for `JSON`, :mod:`pickle`, `YAML`
and ``msgpack``, and you can also add your own custom serializers by registering
them into the Kombu serializer registry
.. seealso::
:ref:`Message Serialization <kombu:guide-serialization>` in the Kombu user
guide.
Each option has its advantages and disadvantages.
json -- JSON is supported in many programming languages, is now
a standard part of Python (since 2.6), and is fairly fast to decode.
The primary disadvantage to JSON is that it limits you to the following
data types: strings, Unicode, floats, Boolean, dictionaries, and lists.
Decimals and dates are notably missing.
Binary data will be transferred using Base64 encoding,
increasing the size of the transferred data by 34% compared to an encoding
format where native binary types are supported.
However, if your data fits inside the above constraints and you need
cross-language support, the default setting of JSON is probably your
best choice.
See http://json.org for more information.
.. note::
(From Python official docs https://docs.python.org/3.6/library/json.html)
Keys in key/value pairs of JSON are always of the type :class:`str`. When
a dictionary is converted into JSON, all the keys of the dictionary are
coerced to strings. As a result of this, if a dictionary is converted
into JSON and then back into a dictionary, the dictionary may not equal
the original one. That is, ``loads(dumps(x)) != x`` if x has non-string
keys.
pickle -- If you have no desire to support any language other than
Python, then using the pickle encoding will gain you the support of
all built-in Python data types (except class instances), smaller
messages when sending binary files, and a slight speedup over JSON
processing.
See :mod:`pickle` for more information.
yaml -- YAML has many of the same characteristics as json,
except that it natively supports more data types (including dates,
recursive references, etc.).
However, the Python libraries for YAML are a good bit slower than the
libraries for JSON.
If you need a more expressive set of data types and need to maintain
cross-language compatibility, then YAML may be a better fit than the above.
To use it, install Celery with:
.. code-block:: console
$ pip install celery[yaml]
See http://yaml.org/ for more information.
msgpack -- msgpack is a binary serialization format that's closer to JSON
in features. The format compresses better, so is a faster to parse and
encode compared to JSON.
To use it, install Celery with:
.. code-block:: console
$ pip install celery[msgpack]
See http://msgpack.org/ for more information.
To use a custom serializer you need to add the content type to
:setting:`accept_content`. By default, only JSON is accepted,
and tasks containing other content headers are rejected.
The following order is used to decide the serializer
used when sending a task:
1. The `serializer` execution option.
2. The :attr:`@-Task.serializer` attribute
3. The :setting:`task_serializer` setting.
Example setting a custom serializer for a single task invocation:
.. code-block:: pycon
>>> add.apply_async((10, 10), serializer='json')
.. _calling-compression:
Compression
===========
Celery can compress messages using the following builtin schemes:
- `brotli`
brotli is optimized for the web, in particular small text
documents. It is most effective for serving static content
such as fonts and html pages.
To use it, install Celery with:
.. code-block:: console
$ pip install celery[brotli]
- `bzip2`
bzip2 creates smaller files than gzip, but compression and
decompression speeds are noticeably slower than those of gzip.
To use it, please ensure your Python executable was compiled
with bzip2 support.
If you get the following :class:`ImportError`:
.. code-block:: pycon
>>> import bz2
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ImportError: No module named 'bz2'
it means that you should recompile your Python version with bzip2 support.
- `gzip`
gzip is suitable for systems that require a small memory footprint,
making it ideal for systems with limited memory. It is often
used to generate files with the ".tar.gz" extension.
To use it, please ensure your Python executable was compiled
with gzip support.
If you get the following :class:`ImportError`:
.. code-block:: pycon
>>> import gzip
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ImportError: No module named 'gzip'
it means that you should recompile your Python version with gzip support.
- `lzma`
lzma provides a good compression ratio and executes with
fast compression and decompression speeds at the expense
of higher memory usage.
To use it, please ensure your Python executable was compiled
with lzma support and that your Python version is 3.3 and above.
If you get the following :class:`ImportError`:
.. code-block:: pycon
>>> import lzma
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ImportError: No module named 'lzma'
it means that you should recompile your Python version with lzma support.
Alternatively, you can also install a backport using:
.. code-block:: console
$ pip install celery[lzma]
- `zlib`
zlib is an abstraction of the Deflate algorithm in library
form which includes support both for the gzip file format
and a lightweight stream format in its API. It is a crucial
component of many software systems - Linux kernel and Git VCS just
to name a few.
To use it, please ensure your Python executable was compiled
with zlib support.
If you get the following :class:`ImportError`:
.. code-block:: pycon
>>> import zlib
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ImportError: No module named 'zlib'
it means that you should recompile your Python version with zlib support.
- `zstd`
zstd targets real-time compression scenarios at zlib-level
and better compression ratios. It's backed by a very fast entropy
stage, provided by Huff0 and FSE library.
To use it, install Celery with:
.. code-block:: console
$ pip install celery[zstd]
You can also create your own compression schemes and register
them in the :func:`kombu compression registry <kombu.compression.register>`.
The following order is used to decide the compression scheme
used when sending a task:
1. The `compression` execution option.
2. The :attr:`@-Task.compression` attribute.
3. The :setting:`task_compression` attribute.
Example specifying the compression used when calling a task::
>>> add.apply_async((2, 2), compression='zlib')
.. _calling-connections:
Connections
===========
.. sidebar:: Automatic Pool Support
Since version 2.3 there's support for automatic connection pools,
so you don't have to manually handle connections and publishers
to reuse connections.
The connection pool is enabled by default since version 2.5.
See the :setting:`broker_pool_limit` setting for more information.
You can handle the connection manually by creating a
publisher:
.. code-block:: python
numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
results = []
with add.app.pool.acquire(block=True) as connection:
with add.get_publisher(connection) as publisher:
try:
for i, j in numbers:
res = add.apply_async((i, j), publisher=publisher)
results.append(res)
print([res.get() for res in results])
Though this particular example is much better expressed as a group:
.. code-block:: pycon
>>> from celery import group
>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.s(i, j) for i, j in numbers).apply_async()
>>> res.get()
[4, 8, 16, 32]
.. _calling-routing:
Routing options
===============
Celery can route tasks to different queues.
Simple routing (name <-> name) is accomplished using the ``queue`` option::
add.apply_async(queue='priority.high')
You can then assign workers to the ``priority.high`` queue by using
the workers :option:`-Q <celery worker -Q>` argument:
.. code-block:: console
$ celery -A proj worker -l INFO -Q celery,priority.high
.. seealso::
Hard-coding queue names in code isn't recommended, the best practice
is to use configuration routers (:setting:`task_routes`).
To find out more about routing, please see :ref:`guide-routing`.
.. _calling-results:
Results options
===============
You can enable or disable result storage using the :setting:`task_ignore_result`
setting or by using the ``ignore_result`` option:
.. code-block:: pycon
>>> result = add.apply_async((1, 2), ignore_result=True)
>>> result.get()
None
>>> # Do not ignore result (default)
...
>>> result = add.apply_async((1, 2), ignore_result=False)
>>> result.get()
3
If you'd like to store additional metadata about the task in the result backend
set the :setting:`result_extended` setting to ``True``.
.. seealso::
For more information on tasks, please see :ref:`guide-tasks`.
Advanced Options
----------------
These options are for advanced users who want to take use of
AMQP's full routing capabilities. Interested parties may read the
:ref:`routing guide <guide-routing>`.
- exchange
Name of exchange (or a :class:`kombu.entity.Exchange`) to
send the message to.
- routing_key
Routing key used to determine.
- priority
A number between `0` and `255`, where `255` is the highest priority.
Supported by: RabbitMQ, Redis (priority reversed, 0 is highest).
|