File: zaqar.py

package info (click to toggle)
python-os-collect-config 14.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 360 kB
  • sloc: python: 3,089; makefile: 19
file content (181 lines) | stat: -rw-r--r-- 7,088 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json

from keystoneclient.v3 import client as keystoneclient
from oslo_config import cfg
from oslo_log import log
from zaqarclient.queues.v2 import client as zaqarclient
from zaqarclient import transport
from zaqarclient.transport import request

from os_collect_config import exc
from os_collect_config import keystone
from os_collect_config import merger

CONF = cfg.CONF
logger = log.getLogger(__name__)

opts = [
    cfg.StrOpt('user-id',
               help='User ID for API authentication'),
    cfg.StrOpt('password',
               help='Password for API authentication'),
    cfg.StrOpt('project-id',
               help='ID of project for API authentication'),
    cfg.StrOpt('auth-url',
               help='URL for API authentication'),
    cfg.StrOpt('queue-id',
               help='ID of the queue to be checked'),
    cfg.BoolOpt('use-websockets',
                default=False,
                help='Use the websocket transport to connect to Zaqar.'),
    cfg.StrOpt('region-name',
               help='Region Name for extracting Zaqar endpoint'),
    cfg.BoolOpt('ssl-certificate-validation',
                help='ssl certificat validation flag for connect to Zaqar',
                default=False),
    cfg.StrOpt('ca-file',
               help='CA Cert file for connect to Zaqar'),
]
name = 'zaqar'


class Collector:
    def __init__(self,
                 keystoneclient=keystoneclient,
                 zaqarclient=zaqarclient,
                 discover_class=None,
                 transport=transport):
        self.keystoneclient = keystoneclient
        self.zaqarclient = zaqarclient
        self.discover_class = discover_class
        self.transport = transport

    def get_data_wsgi(self, ks, conf):
        kwargs = {'service_type': 'messaging', 'endpoint_type': 'publicURL'}
        if CONF.zaqar.region_name:
            kwargs['region_name'] = CONF.zaqar.region_name
        endpoint = ks.service_catalog.url_for(**kwargs)
        logger.debug('Fetching metadata from %s' % endpoint)
        zaqar = self.zaqarclient.Client(endpoint, conf=conf, version=2)

        queue = zaqar.queue(CONF.zaqar.queue_id)
        r = next(queue.pop())
        return r.body

    def _create_req(self, endpoint, action, body):
        return request.Request(endpoint, action, content=json.dumps(body))

    def get_data_websocket(self, ks, conf):
        kwargs = {'service_type': 'messaging-websocket',
                  'endpoint_type': 'publicURL'}
        if CONF.zaqar.region_name:
            kwargs['region_name'] = CONF.zaqar.region_name
        endpoint = ks.service_catalog.url_for(**kwargs)

        logger.debug('Fetching metadata from %s' % endpoint)

        with self.transport.get_transport_for(endpoint, options=conf) as ws:
            # create queue
            req = self._create_req(endpoint, 'queue_create',
                                   {'queue_name': CONF.zaqar.queue_id})
            ws.send(req)
            # subscribe to queue messages
            req = self._create_req(endpoint, 'subscription_create',
                                   {'queue_name': CONF.zaqar.queue_id,
                                    'ttl': 10000})
            ws.send(req)

            # check for pre-existing messages
            req = self._create_req(endpoint, 'message_delete_many',
                                   {'queue_name': CONF.zaqar.queue_id,
                                    'pop': 1})
            resp = ws.send(req)
            messages = json.loads(resp.content).get('messages', [])

            if len(messages) > 0:
                # NOTE(dprince) In this case we are checking for queue
                # messages that arrived before we subscribed.
                logger.debug('Websocket message found...')
                msg_0 = messages[0]
                data = msg_0['body']

            else:
                # NOTE(dprince) This will block until there is data available
                # or the socket times out. Because we subscribe to the queue
                # it will allow us to process data immediately.
                logger.debug('websocket recv()')
                data = ws.recv()['body']

        return data

    def collect(self):
        if CONF.zaqar.auth_url is None:
            logger.warn('No auth_url configured.')
            raise exc.ZaqarMetadataNotConfigured()
        if CONF.zaqar.password is None:
            logger.warn('No password configured.')
            raise exc.ZaqarMetadataNotConfigured()
        if CONF.zaqar.project_id is None:
            logger.warn('No project_id configured.')
            raise exc.ZaqarMetadataNotConfigured()
        if CONF.zaqar.user_id is None:
            logger.warn('No user_id configured.')
            raise exc.ZaqarMetadataNotConfigured()
        if CONF.zaqar.queue_id is None:
            logger.warn('No queue_id configured.')
            raise exc.ZaqarMetadataNotConfigured()
        if CONF.zaqar.ssl_certificate_validation is True and (
                CONF.zaqar.ca_file is None):
            logger.warn('No CA file configured when flag ssl certificate '
                        'validation is on.')
            raise exc.ZaqarMetadataNotConfigured()
        # NOTE(flwang): To be compatible with old versions, we won't throw
        # error here if there is no region name.

        try:
            ks = keystone.Keystone(
                auth_url=CONF.zaqar.auth_url,
                user_id=CONF.zaqar.user_id,
                password=CONF.zaqar.password,
                project_id=CONF.zaqar.project_id,
                keystoneclient=self.keystoneclient,
                discover_class=self.discover_class).client

            conf = {
                'auth_opts': {
                    'backend': 'keystone',
                    'options': {
                        'os_auth_token': ks.auth_token,
                        'os_project_id': CONF.zaqar.project_id,
                        'insecure': not CONF.zaqar.ssl_certificate_validation,
                        'cacert': CONF.zaqar.ca_file
                    }
                }
            }

            if CONF.zaqar.use_websockets:
                data = self.get_data_websocket(ks, conf)
            else:
                data = self.get_data_wsgi(ks, conf)

            final_list = merger.merged_list_from_content(
                data, cfg.CONF.deployment_key, name)
            return final_list

        except Exception as e:
            logger.warn(str(e))
            raise exc.ZaqarMetadataNotAvailable()