1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205
import json
import logging
import pika
from pika.exchange_type import ExchangeType
print('pika version: %s' % pika.__version__)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
main_channel = connection.channel()
consumer_channel = connection.channel()
bind_channel = connection.channel()
main_channel.exchange_declare(exchange='com.micex.sten', exchange_type=ExchangeType.direct)
main_channel.exchange_declare(
exchange='com.micex.lasttrades', exchange_type=ExchangeType.direct)
queue = main_channel.queue_declare('', exclusive=True).method.queue
queue_tickers = main_channel.queue_declare('', exclusive=True).method.queue
main_channel.queue_bind(
exchange='com.micex.sten', queue=queue, routing_key='order.stop.create')
def hello():
print('Hello world')
connection.call_later(5, hello)
def callback(_ch, _method, _properties, body):
body = json.loads(body)['order.stop.create']
ticker = None
if 'ticker' in body['data']['params']['condition']:
ticker = body['data']['params']['condition']['ticker']
if not ticker:
return
print('got ticker %s, gonna bind it...' % ticker)
bind_channel.queue_bind(
exchange='com.micex.lasttrades',
queue=queue_tickers,
routing_key=str(ticker))
print('ticker %s binded ok' % ticker)
logging.basicConfig(level=logging.INFO)
# Note: consuming with automatic acknowledgements has its risks
# and used here for simplicity.
# See https://www.rabbitmq.com/confirms.html.
consumer_channel.basic_consume(queue, callback, auto_ack=True)
try:
consumer_channel.start_consuming()
finally:
connection.close()
|