1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205
import functools
import logging
import threading
import time
import pika
from pika.exchange_type import ExchangeType
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
def ack_message(ch, delivery_tag):
"""Note that `ch` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if ch.is_open:
ch.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass
def do_work(ch, delivery_tag, body):
thread_id = threading.get_ident()
LOGGER.info('Thread id: %s Delivery tag: %s Message body: %s', thread_id,
delivery_tag, body)
# Sleeping to simulate 10 seconds of work
time.sleep(10)
cb = functools.partial(ack_message, ch, delivery_tag)
ch.connection.add_callback_threadsafe(cb)
def on_message(ch, method_frame, _header_frame, body, args):
thrds = args
delivery_tag = method_frame.delivery_tag
t = threading.Thread(target=do_work, args=(ch, delivery_tag, body))
t.start()
thrds.append(t)
credentials = pika.PlainCredentials('guest', 'guest')
# Note: sending a short heartbeat to prove that heartbeats are still
# sent even though the worker simulates long-running work
parameters = pika.ConnectionParameters(
'localhost', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(
exchange="test_exchange",
exchange_type=ExchangeType.direct,
passive=False,
durable=True,
auto_delete=False)
channel.queue_declare(queue="standard", auto_delete=True)
channel.queue_bind(
queue="standard", exchange="test_exchange", routing_key="standard_key")
# Note: prefetch is set to 1 here as an example only and to keep the number of threads created
# to a reasonable amount. In production you will want to test with different prefetch values
# to find which one provides the best performance and usability for your solution
channel.basic_qos(prefetch_count=1)
threads = []
on_message_callback = functools.partial(on_message, args=(threads))
channel.basic_consume('standard', on_message_callback)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
# Wait for all to complete
for thread in threads:
thread.join()
connection.close()
|