File: myapp.py

package info (click to toggle)
celery 5.6.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,380 kB
  • sloc: python: 67,264; sh: 795; makefile: 378
file content (149 lines) | stat: -rw-r--r-- 4,266 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""myapp.py

Usage::

   (window1)$ python myapp.py worker -l INFO

   (window2)$ celery shell
   >>> from myapp import example
   >>> example()


You can also specify the app to use with the `celery` command,
using the `-A` / `--app` option::

    $ celery -A myapp worker -l INFO

With the `-A myproj` argument the program will search for an app
instance in the module ``myproj``.  You can also specify an explicit
name using the fully qualified form::

    $ celery -A myapp:app worker -l INFO

"""

import os
from datetime import UTC, datetime, timedelta

from declare_queue import my_quorum_queue

from celery import Celery
from celery.canvas import group

app = Celery("myapp", broker="amqp://guest@localhost//")

# Use custom queue (Optional) or set the default queue type to "quorum"
# app.conf.task_queues = (my_quorum_queue,)  # uncomment to use custom queue
app.conf.task_default_queue_type = "quorum"  # comment to use classic queue

# Required by Quorum Queues: https://www.rabbitmq.com/docs/quorum-queues#use-cases
app.conf.broker_transport_options = {"confirm_publish": True}

# Reduce qos to 4 (Optional, useful for testing)
app.conf.worker_prefetch_multiplier = 1
app.conf.worker_concurrency = 4

# Reduce logs (Optional, useful for testing)
app.conf.worker_heartbeat = None
app.conf.broker_heartbeat = 0


def is_using_quorum_queues(app) -> bool:
    queues = app.amqp.queues
    for qname in queues:
        qarguments = queues[qname].queue_arguments or {}
        if qarguments.get("x-queue-type") == "quorum":
            return True

    return False


@app.task
def add(x, y):
    return x + y


@app.task
def identity(x):
    return x


def example():
    queue = my_quorum_queue.name if my_quorum_queue in (app.conf.task_queues or {}) else "celery"

    while True:
        print("Celery Quorum Queue Example")
        print("===========================")
        print("1. Send a simple identity task")
        print("1.1 Send an ETA identity task")
        print("2. Send a group of add tasks")
        print("3. Inspect the active queues")
        print("4. Shutdown Celery worker")
        print("Q. Quit")
        print("Q! Exit")
        choice = input("Enter your choice (1-4 or Q): ")

        if choice == "1" or choice == "1.1":
            queue_type = "Quorum" if is_using_quorum_queues(app) else "Classic"
            payload = f"Hello, {queue_type} Queue!"
            eta = datetime.now(UTC) + timedelta(seconds=30)
            if choice == "1.1":
                result = identity.si(payload).apply_async(queue=queue, eta=eta)
            else:
                result = identity.si(payload).apply_async(queue=queue)
            print()
            print(f"Task sent with ID: {result.id}")
            print("Task type: identity")

            if choice == "1.1":
                print(f"ETA: {eta}")

            print(f"Payload: {payload}")

        elif choice == "2":
            tasks = [
                (1, 2),
                (3, 4),
                (5, 6),
            ]
            result = group(
                add.s(*tasks[0]),
                add.s(*tasks[1]),
                add.s(*tasks[2]),
            ).apply_async(queue=queue)
            print()
            print("Group of tasks sent.")
            print(f"Group result ID: {result.id}")
            for i, task_args in enumerate(tasks, 1):
                print(f"Task {i} type: add")
                print(f"Payload: {task_args}")

        elif choice == "3":
            active_queues = app.control.inspect().active_queues()
            print()
            print("Active queues:")
            for worker, queues in active_queues.items():
                print(f"Worker: {worker}")
                for q in queues:
                    print(f"  - {q['name']}")

        elif choice == "4":
            print("Shutting down Celery worker...")
            app.control.shutdown()

        elif choice.lower() == "q":
            print("Quitting test()")
            break

        elif choice.lower() == "q!":
            print("Exiting...")
            os.abort()

        else:
            print("Invalid choice. Please enter a number between 1 and 4 or Q to quit.")

        print("\n" + "#" * 80 + "\n")


if __name__ == "__main__":
    app.start()