1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205,W0603
import logging
import pika
from pika import spec, DeliveryMode
ITERATIONS = 100
logging.basicConfig(level=logging.INFO)
confirmed = 0
errors = 0
published = 0
def on_open(conn):
conn.channel(on_open_callback=on_channel_open)
def on_channel_open(channel):
global published
channel.confirm_delivery(ack_nack_callback=on_delivery_confirmation)
for _iteration in range(0, ITERATIONS):
channel.basic_publish(
'test', 'test.confirm', 'message body value',
pika.BasicProperties(content_type='text/plain',
delivery_mode=DeliveryMode.Transient))
published += 1
def on_delivery_confirmation(frame):
global confirmed, errors
if isinstance(frame.method, spec.Basic.Ack):
confirmed += 1
logging.info('Received confirmation: %r', frame.method)
else:
logging.error('Received negative confirmation: %r', frame.method)
errors += 1
if (confirmed + errors) == ITERATIONS:
logging.info(
'All confirmations received, published %i, confirmed %i with %i errors',
published, confirmed, errors)
connection.close()
parameters = pika.URLParameters(
'amqp://guest:guest@localhost:5672/%2F?connection_attempts=50')
connection = pika.SelectConnection(parameters=parameters,
on_open_callback=on_open)
try:
connection.ioloop.start()
except KeyboardInterrupt:
connection.close()
connection.ioloop.start()
|