1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
import logging
import logging.config
import uuid
import time
import requests
import pika
class ProcessServiceClient(object):
def __init__(self, rabbitmq_host=None, rabbitmq_port=5672, rabbitmq_vhost='/', rabbitmq_user=None, rabbitmq_password=None, logger=None):
self.rabbitmq = rabbitmq_host
self.remote = False
self.session = None
self.proxy = None
self.channel = None
self.biomaj_process = None
self.logger = logging
if logger:
self.logger = logger
if rabbitmq_host:
self.remote = True
connection = None
if rabbitmq_user:
credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host, rabbitmq_port, rabbitmq_vhost, credentials, heartbeat_interval=0))
else:
connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host, rabbitmq_port, rabbitmq_vhost, heartbeat_interval=0))
self.channel = connection.channel()
def create_session(self, bank, proxy=None):
self.bank = bank
if not self.remote:
self.session = str(uuid.uuid4())
return self.session
for i in range(3):
try:
url = proxy + '/api/process/session/' + bank
r = requests.post(url)
if r.status_code == 200:
result = r.json()
self.session = result['session']
self.proxy = proxy
return result['session']
except Exception:
logging.exception('Failed to send create operation: %s' % (url))
time.sleep(5)
raise Exception('Failed to connect to the process proxy')
def execute_process(self, biomaj_process):
self.biomaj_process = biomaj_process
if self.remote:
self.channel.basic_publish(
exchange='',
routing_key='biomajprocess',
body=biomaj_process.SerializeToString(),
properties=pika.BasicProperties(
# make message persistent
delivery_mode=2
))
def wait_for_process(self):
over = False
exitcode = -1
info = None
logging.info("Process:RemoteProcess:Waiting:" + str(self.biomaj_process.process.name))
errors = 0
while not over:
if errors >= 3:
raise Exception('Failed to contact process proxy 3 times, stopping...')
result = {'exitcode': -1}
try:
r = requests.get(self.proxy + '/api/process/session/' + self.bank + '/' + self.session)
if not r.status_code == 200:
logging.error('Failed to connect to the process proxy')
errors += 1
else:
result = r.json()
errors = 0
except Exception:
logging.exception('Failed to get status from process proxy')
errors += 1
# {'error': error, 'exitcode': exitcode, 'info': info}
if result['exitcode'] > -1:
exitcode = result['exitcode']
over = True
if result['exitcode'] > 0:
info = result['info']
self.logger.error('Process:RemoteProcess:Error:' + str(self.biomaj_process.process.name) + ': ' + str(result['info']))
else:
time.sleep(10)
return (exitcode, info)
def clean(self):
if self.remote:
for i in range(3):
try:
url = self.proxy + '/api/process/session/' + self.bank + '/' + self.session
r = requests.delete(url)
if r.status_code == 200:
return
except Exception:
logging.exception('Failed to send clean operation: %s' % (url))
time.sleep(5)
logging.error('Process:RemoteProcess:Session:Clean:Error:' + self.bank + '/' + self.session)
|