File: confirmation.py

package info (click to toggle)
python-pika 1.3.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,064 kB
  • sloc: python: 20,886; makefile: 136
file content (56 lines) | stat: -rw-r--r-- 1,579 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205,W0603

import logging
import pika
from pika import spec, DeliveryMode

ITERATIONS = 100

logging.basicConfig(level=logging.INFO)

confirmed = 0
errors = 0
published = 0


def on_open(conn):
    conn.channel(on_open_callback=on_channel_open)


def on_channel_open(channel):
    global published
    channel.confirm_delivery(ack_nack_callback=on_delivery_confirmation)
    for _iteration in range(0, ITERATIONS):
        channel.basic_publish(
            'test', 'test.confirm', 'message body value',
            pika.BasicProperties(content_type='text/plain',
                                 delivery_mode=DeliveryMode.Transient))
        published += 1


def on_delivery_confirmation(frame):
    global confirmed, errors
    if isinstance(frame.method, spec.Basic.Ack):
        confirmed += 1
        logging.info('Received confirmation: %r', frame.method)
    else:
        logging.error('Received negative confirmation: %r', frame.method)
        errors += 1
    if (confirmed + errors) == ITERATIONS:
        logging.info(
            'All confirmations received, published %i, confirmed %i with %i errors',
            published, confirmed, errors)
        connection.close()


parameters = pika.URLParameters(
    'amqp://guest:guest@localhost:5672/%2F?connection_attempts=50')
connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:
    connection.ioloop.start()
except KeyboardInterrupt:
    connection.close()
    connection.ioloop.start()