File: long_running_publisher.py

package info (click to toggle)
python-pika 1.3.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,064 kB
  • sloc: python: 20,886; makefile: 136
file content (55 lines) | stat: -rw-r--r-- 1,716 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205

import threading
from time import sleep
from pika import ConnectionParameters, BlockingConnection, PlainCredentials


class Publisher(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.daemon = True
        self.is_running = True
        self.name = "Publisher"
        self.queue = "downstream_queue"

        credentials = PlainCredentials("guest", "guest")
        parameters = ConnectionParameters("localhost", credentials=credentials)
        self.connection = BlockingConnection(parameters)
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue, auto_delete=True)

    def run(self):
        while self.is_running:
            self.connection.process_data_events(time_limit=1)

    def _publish(self, message):
        self.channel.basic_publish("", self.queue, body=message.encode())

    def publish(self, message):
        self.connection.add_callback_threadsafe(lambda: self._publish(message))

    def stop(self):
        print("Stopping...")
        self.is_running = False
        # Wait until all the data events have been processed
        self.connection.process_data_events(time_limit=1)
        if self.connection.is_open:
            self.connection.close()
        print("Stopped")


if __name__ == "__main__":
    publisher = Publisher()
    publisher.start()
    try:
        for i in range(9999):
            msg = f"Message {i}"
            print(f"Publishing: {msg!r}")
            publisher.publish(msg)
            sleep(1)
    except KeyboardInterrupt:
        publisher.stop()
    finally:
        publisher.join()