File: delayed_infra.py

package info (click to toggle)
kombu 5.6.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,968 kB
  • sloc: python: 28,815; makefile: 318
file content (23 lines) | stat: -rw-r--r-- 1,021 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from __future__ import annotations

from kombu import Connection, Exchange, Queue
from kombu.transport.native_delayed_delivery import (
    bind_queue_to_native_delayed_delivery_exchange, calculate_routing_key,
    declare_native_delayed_delivery_exchanges_and_queues, level_name)

with Connection('amqp://guest:guest@localhost:5672//') as connection:
    declare_native_delayed_delivery_exchanges_and_queues(connection, 'quorum')
    channel = connection.channel()

    destination_exchange = Exchange('destination_exchange', type='topic')
    queue = Queue("destination", exchange=destination_exchange, routing_key='destination_route')
    queue.declare(channel=connection.channel())

    bind_queue_to_native_delayed_delivery_exchange(connection, queue)
    with connection.Producer(channel=channel) as producer:
        routing_key = calculate_routing_key(30, 'destination_route')
        producer.publish(
            "delayed msg",
            routing_key=routing_key,
            exchange=level_name(27)
        )