1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
"""myapp.py
Usage::
(window1)$ python myapp.py worker -l INFO
(window2)$ celery shell
>>> from myapp import example
>>> example()
You can also specify the app to use with the `celery` command,
using the `-A` / `--app` option::
$ celery -A myapp worker -l INFO
With the `-A myproj` argument the program will search for an app
instance in the module ``myproj``. You can also specify an explicit
name using the fully qualified form::
$ celery -A myapp:app worker -l INFO
"""
import os
from datetime import UTC, datetime, timedelta
from declare_queue import my_quorum_queue
from celery import Celery
from celery.canvas import group
app = Celery("myapp", broker="amqp://guest@localhost//")
# Use custom queue (Optional) or set the default queue type to "quorum"
# app.conf.task_queues = (my_quorum_queue,) # uncomment to use custom queue
app.conf.task_default_queue_type = "quorum" # comment to use classic queue
# Required by Quorum Queues: https://www.rabbitmq.com/docs/quorum-queues#use-cases
app.conf.broker_transport_options = {"confirm_publish": True}
# Reduce qos to 4 (Optional, useful for testing)
app.conf.worker_prefetch_multiplier = 1
app.conf.worker_concurrency = 4
# Reduce logs (Optional, useful for testing)
app.conf.worker_heartbeat = None
app.conf.broker_heartbeat = 0
def is_using_quorum_queues(app) -> bool:
queues = app.amqp.queues
for qname in queues:
qarguments = queues[qname].queue_arguments or {}
if qarguments.get("x-queue-type") == "quorum":
return True
return False
@app.task
def add(x, y):
return x + y
@app.task
def identity(x):
return x
def example():
queue = my_quorum_queue.name if my_quorum_queue in (app.conf.task_queues or {}) else "celery"
while True:
print("Celery Quorum Queue Example")
print("===========================")
print("1. Send a simple identity task")
print("1.1 Send an ETA identity task")
print("2. Send a group of add tasks")
print("3. Inspect the active queues")
print("4. Shutdown Celery worker")
print("Q. Quit")
print("Q! Exit")
choice = input("Enter your choice (1-4 or Q): ")
if choice == "1" or choice == "1.1":
queue_type = "Quorum" if is_using_quorum_queues(app) else "Classic"
payload = f"Hello, {queue_type} Queue!"
eta = datetime.now(UTC) + timedelta(seconds=30)
if choice == "1.1":
result = identity.si(payload).apply_async(queue=queue, eta=eta)
else:
result = identity.si(payload).apply_async(queue=queue)
print()
print(f"Task sent with ID: {result.id}")
print("Task type: identity")
if choice == "1.1":
print(f"ETA: {eta}")
print(f"Payload: {payload}")
elif choice == "2":
tasks = [
(1, 2),
(3, 4),
(5, 6),
]
result = group(
add.s(*tasks[0]),
add.s(*tasks[1]),
add.s(*tasks[2]),
).apply_async(queue=queue)
print()
print("Group of tasks sent.")
print(f"Group result ID: {result.id}")
for i, task_args in enumerate(tasks, 1):
print(f"Task {i} type: add")
print(f"Payload: {task_args}")
elif choice == "3":
active_queues = app.control.inspect().active_queues()
print()
print("Active queues:")
for worker, queues in active_queues.items():
print(f"Worker: {worker}")
for q in queues:
print(f" - {q['name']}")
elif choice == "4":
print("Shutting down Celery worker...")
app.control.shutdown()
elif choice.lower() == "q":
print("Quitting test()")
break
elif choice.lower() == "q!":
print("Exiting...")
os.abort()
else:
print("Invalid choice. Please enter a number between 1 and 4 or Q to quit.")
print("\n" + "#" * 80 + "\n")
if __name__ == "__main__":
app.start()
|