File: consumer_queued.py

package info (click to toggle)
python-pika 0.9.14-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 1,048 kB
  • ctags: 2,110
  • sloc: python: 10,046; makefile: 134
file content (66 lines) | stat: -rw-r--r-- 1,928 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#!/usr/bin/python
# -*- coding: utf-8 -*-

import pika
import json
import threading


buffer = []
lock = threading.Lock()

print('pika version: %s' % pika.__version__)


connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

main_channel     = connection.channel()
consumer_channel = connection.channel()
bind_channel     = connection.channel()

if pika.__version__=='0.9.5':
    main_channel.exchange_declare(exchange='com.micex.sten',       type='direct')
    main_channel.exchange_declare(exchange='com.micex.lasttrades', type='direct')
else:
    main_channel.exchange_declare(exchange='com.micex.sten',       exchange_type='direct')
    main_channel.exchange_declare(exchange='com.micex.lasttrades', exchange_type='direct')

queue         = main_channel.queue_declare(exclusive=True).method.queue
queue_tickers = main_channel.queue_declare(exclusive=True).method.queue

main_channel.queue_bind(exchange='com.micex.sten', queue=queue, routing_key='order.stop.create')



def process_buffer():
    if not lock.acquire(False):
        print('locked!')
        return
    try:
        while len(buffer):
            body = buffer.pop(0)

            ticker = None
            if 'ticker' in body['data']['params']['condition']: ticker = body['data']['params']['condition']['ticker']
            if not ticker: continue

            print('got ticker %s, gonna bind it...' % ticker)
            bind_channel.queue_bind(exchange='com.micex.lasttrades', queue=queue_tickers, routing_key=str(ticker))
            print('ticker %s binded ok' % ticker)
    finally:
        lock.release()


def callback(ch, method, properties, body):
    body = json.loads(body)['order.stop.create']
    buffer.append(body)
    process_buffer()


consumer_channel.basic_consume(callback,
                               queue=queue, no_ack=True)

try:
    consumer_channel.start_consuming()
finally:
    connection.close()