File: process_client.py

package info (click to toggle)
biomaj3-process 3.0.19-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 284 kB
  • sloc: python: 903; sh: 14; makefile: 10
file content (108 lines) | stat: -rw-r--r-- 4,245 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import logging
import logging.config
import uuid
import time

import requests
import pika


class ProcessServiceClient(object):

    def __init__(self, rabbitmq_host=None, rabbitmq_port=5672, rabbitmq_vhost='/', rabbitmq_user=None, rabbitmq_password=None, logger=None):
        self.rabbitmq = rabbitmq_host
        self.remote = False
        self.session = None
        self.proxy = None
        self.channel = None
        self.biomaj_process = None
        self.logger = logging
        if logger:
            self.logger = logger
        if rabbitmq_host:
            self.remote = True
            connection = None
            if rabbitmq_user:
                credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
                connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host, rabbitmq_port, rabbitmq_vhost, credentials, heartbeat_interval=0))
            else:
                connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host, rabbitmq_port, rabbitmq_vhost, heartbeat_interval=0))
            self.channel = connection.channel()

    def create_session(self, bank, proxy=None):
        self.bank = bank
        if not self.remote:
            self.session = str(uuid.uuid4())
            return self.session

        for i in range(3):
            try:
                url = proxy + '/api/process/session/' + bank
                r = requests.post(url)
                if r.status_code == 200:
                    result = r.json()
                    self.session = result['session']
                    self.proxy = proxy
                    return result['session']
            except Exception:
                logging.exception('Failed to send create operation: %s' % (url))
                time.sleep(5)
        raise Exception('Failed to connect to the process proxy')

    def execute_process(self, biomaj_process):
        self.biomaj_process = biomaj_process
        if self.remote:
            self.channel.basic_publish(
                exchange='',
                routing_key='biomajprocess',
                body=biomaj_process.SerializeToString(),
                properties=pika.BasicProperties(
                    # make message persistent
                    delivery_mode=2
                ))

    def wait_for_process(self):
        over = False
        exitcode = -1
        info = None
        logging.info("Process:RemoteProcess:Waiting:" + str(self.biomaj_process.process.name))
        errors = 0
        while not over:
            if errors >= 3:
                raise Exception('Failed to contact process proxy 3 times, stopping...')
            result = {'exitcode': -1}
            try:
                r = requests.get(self.proxy + '/api/process/session/' + self.bank + '/' + self.session)
                if not r.status_code == 200:
                    logging.error('Failed to connect to the process proxy')
                    errors += 1
                else:
                    result = r.json()
                    errors = 0
            except Exception:
                logging.exception('Failed to get status from process proxy')
                errors += 1

            # {'error': error, 'exitcode': exitcode, 'info': info}
            if result['exitcode'] > -1:
                exitcode = result['exitcode']
                over = True
                if result['exitcode'] > 0:
                    info = result['info']
                    self.logger.error('Process:RemoteProcess:Error:' + str(self.biomaj_process.process.name) + ': ' + str(result['info']))
            else:
                time.sleep(10)
        return (exitcode, info)

    def clean(self):
        if self.remote:
            for i in range(3):
                try:
                    url = self.proxy + '/api/process/session/' + self.bank + '/' + self.session
                    r = requests.delete(url)
                    if r.status_code == 200:
                        return
                except Exception:
                    logging.exception('Failed to send clean operation: %s' % (url))
                    time.sleep(5)
            logging.error('Process:RemoteProcess:Session:Clean:Error:' + self.bank + '/' + self.session)