1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205
import threading
from time import sleep
from pika import ConnectionParameters, BlockingConnection, PlainCredentials
class Publisher(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.daemon = True
self.is_running = True
self.name = "Publisher"
self.queue = "downstream_queue"
credentials = PlainCredentials("guest", "guest")
parameters = ConnectionParameters("localhost", credentials=credentials)
self.connection = BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue, auto_delete=True)
def run(self):
while self.is_running:
self.connection.process_data_events(time_limit=1)
def _publish(self, message):
self.channel.basic_publish("", self.queue, body=message.encode())
def publish(self, message):
self.connection.add_callback_threadsafe(lambda: self._publish(message))
def stop(self):
print("Stopping...")
self.is_running = False
# Wait until all the data events have been processed
self.connection.process_data_events(time_limit=1)
if self.connection.is_open:
self.connection.close()
print("Stopped")
if __name__ == "__main__":
publisher = Publisher()
publisher.start()
try:
for i in range(9999):
msg = f"Message {i}"
print(f"Publishing: {msg!r}")
publisher.publish(msg)
sleep(1)
except KeyboardInterrupt:
publisher.stop()
finally:
publisher.join()
|