1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from keystoneclient.v3 import client as keystoneclient
from oslo_config import cfg
from oslo_log import log
from zaqarclient.queues.v2 import client as zaqarclient
from zaqarclient import transport
from zaqarclient.transport import request
from os_collect_config import exc
from os_collect_config import keystone
from os_collect_config import merger
CONF = cfg.CONF
logger = log.getLogger(__name__)
opts = [
cfg.StrOpt('user-id',
help='User ID for API authentication'),
cfg.StrOpt('password',
help='Password for API authentication'),
cfg.StrOpt('project-id',
help='ID of project for API authentication'),
cfg.StrOpt('auth-url',
help='URL for API authentication'),
cfg.StrOpt('queue-id',
help='ID of the queue to be checked'),
cfg.BoolOpt('use-websockets',
default=False,
help='Use the websocket transport to connect to Zaqar.'),
cfg.StrOpt('region-name',
help='Region Name for extracting Zaqar endpoint'),
cfg.BoolOpt('ssl-certificate-validation',
help='ssl certificat validation flag for connect to Zaqar',
default=False),
cfg.StrOpt('ca-file',
help='CA Cert file for connect to Zaqar'),
]
name = 'zaqar'
class Collector:
def __init__(self,
keystoneclient=keystoneclient,
zaqarclient=zaqarclient,
discover_class=None,
transport=transport):
self.keystoneclient = keystoneclient
self.zaqarclient = zaqarclient
self.discover_class = discover_class
self.transport = transport
def get_data_wsgi(self, ks, conf):
kwargs = {'service_type': 'messaging', 'endpoint_type': 'publicURL'}
if CONF.zaqar.region_name:
kwargs['region_name'] = CONF.zaqar.region_name
endpoint = ks.service_catalog.url_for(**kwargs)
logger.debug('Fetching metadata from %s' % endpoint)
zaqar = self.zaqarclient.Client(endpoint, conf=conf, version=2)
queue = zaqar.queue(CONF.zaqar.queue_id)
r = next(queue.pop())
return r.body
def _create_req(self, endpoint, action, body):
return request.Request(endpoint, action, content=json.dumps(body))
def get_data_websocket(self, ks, conf):
kwargs = {'service_type': 'messaging-websocket',
'endpoint_type': 'publicURL'}
if CONF.zaqar.region_name:
kwargs['region_name'] = CONF.zaqar.region_name
endpoint = ks.service_catalog.url_for(**kwargs)
logger.debug('Fetching metadata from %s' % endpoint)
with self.transport.get_transport_for(endpoint, options=conf) as ws:
# create queue
req = self._create_req(endpoint, 'queue_create',
{'queue_name': CONF.zaqar.queue_id})
ws.send(req)
# subscribe to queue messages
req = self._create_req(endpoint, 'subscription_create',
{'queue_name': CONF.zaqar.queue_id,
'ttl': 10000})
ws.send(req)
# check for pre-existing messages
req = self._create_req(endpoint, 'message_delete_many',
{'queue_name': CONF.zaqar.queue_id,
'pop': 1})
resp = ws.send(req)
messages = json.loads(resp.content).get('messages', [])
if len(messages) > 0:
# NOTE(dprince) In this case we are checking for queue
# messages that arrived before we subscribed.
logger.debug('Websocket message found...')
msg_0 = messages[0]
data = msg_0['body']
else:
# NOTE(dprince) This will block until there is data available
# or the socket times out. Because we subscribe to the queue
# it will allow us to process data immediately.
logger.debug('websocket recv()')
data = ws.recv()['body']
return data
def collect(self):
if CONF.zaqar.auth_url is None:
logger.warn('No auth_url configured.')
raise exc.ZaqarMetadataNotConfigured()
if CONF.zaqar.password is None:
logger.warn('No password configured.')
raise exc.ZaqarMetadataNotConfigured()
if CONF.zaqar.project_id is None:
logger.warn('No project_id configured.')
raise exc.ZaqarMetadataNotConfigured()
if CONF.zaqar.user_id is None:
logger.warn('No user_id configured.')
raise exc.ZaqarMetadataNotConfigured()
if CONF.zaqar.queue_id is None:
logger.warn('No queue_id configured.')
raise exc.ZaqarMetadataNotConfigured()
if CONF.zaqar.ssl_certificate_validation is True and (
CONF.zaqar.ca_file is None):
logger.warn('No CA file configured when flag ssl certificate '
'validation is on.')
raise exc.ZaqarMetadataNotConfigured()
# NOTE(flwang): To be compatible with old versions, we won't throw
# error here if there is no region name.
try:
ks = keystone.Keystone(
auth_url=CONF.zaqar.auth_url,
user_id=CONF.zaqar.user_id,
password=CONF.zaqar.password,
project_id=CONF.zaqar.project_id,
keystoneclient=self.keystoneclient,
discover_class=self.discover_class).client
conf = {
'auth_opts': {
'backend': 'keystone',
'options': {
'os_auth_token': ks.auth_token,
'os_project_id': CONF.zaqar.project_id,
'insecure': not CONF.zaqar.ssl_certificate_validation,
'cacert': CONF.zaqar.ca_file
}
}
}
if CONF.zaqar.use_websockets:
data = self.get_data_websocket(ks, conf)
else:
data = self.get_data_wsgi(ks, conf)
final_list = merger.merged_list_from_content(
data, cfg.CONF.deployment_key, name)
return final_list
except Exception as e:
logger.warn(str(e))
raise exc.ZaqarMetadataNotAvailable()
|