1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
|
# Copyright 2016 Mirantis Inc.
# Copyright 2016 IBM Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from urllib import parse as parser
from debtcollector import removals
from oslo_config import cfg
from oslo_serialization import jsonutils
from osprofiler.drivers import base
from osprofiler import exc
class Redis(base.Driver):
@removals.removed_kwarg("db", message="'db' parameter is deprecated "
"and will be removed in future. "
"Please specify 'db' in "
"'connection_string' instead.")
def __init__(self, connection_str, db=0, project=None,
service=None, host=None, conf=cfg.CONF, **kwargs):
"""Redis driver for OSProfiler."""
super(Redis, self).__init__(connection_str, project=project,
service=service, host=host,
conf=conf, **kwargs)
try:
from redis import Redis as _Redis
except ImportError:
raise exc.CommandError(
"To use OSProfiler with Redis driver, "
"please install `redis` library. "
"To install with pip:\n `pip install redis`.")
# only connection over network is supported with schema
# redis://[:password]@host[:port][/db]
self.db = _Redis.from_url(self.connection_str)
self.namespace_opt = "osprofiler_opt:"
self.namespace = "osprofiler:" # legacy
self.namespace_error = "osprofiler_error:"
@classmethod
def get_name(cls):
return "redis"
def notify(self, info):
"""Send notifications to Redis.
:param info: Contains information about trace element.
In payload dict there are always 3 ids:
"base_id" - uuid that is common for all notifications
related to one trace. Used to simplify retrieving of all
trace elements from Redis.
"parent_id" - uuid of parent element in trace
"trace_id" - uuid of current element in trace
With parent_id and trace_id it's quite simple to build
tree of trace elements, which simplify analyze of trace.
"""
data = info.copy()
data["project"] = self.project
data["service"] = self.service
key = self.namespace_opt + data["base_id"]
self.db.lpush(key, jsonutils.dumps(data))
if (self.filter_error_trace
and data.get("info", {}).get("etype") is not None):
self.notify_error_trace(data)
def notify_error_trace(self, data):
"""Store base_id and timestamp of error trace to a separate key."""
key = self.namespace_error + data["base_id"]
value = jsonutils.dumps({
"base_id": data["base_id"],
"timestamp": data["timestamp"]
})
self.db.set(key, value)
def list_traces(self, fields=None):
"""Query all traces from the storage.
:param fields: Set of trace fields to return. Defaults to 'base_id'
and 'timestamp'
:returns: List of traces, where each trace is a dictionary containing
at least `base_id` and `timestamp`.
"""
fields = set(fields or self.default_trace_fields)
# first get legacy events
result = self._list_traces_legacy(fields)
# with optimized schema trace events are stored in a list
ids = self.db.scan_iter(match=self.namespace_opt + "*")
for i in ids:
# for each trace query the first event to have a timestamp
first_event = jsonutils.loads(self.db.lindex(i, 1))
result.append({key: value for key, value in first_event.items()
if key in fields})
return result
def _list_traces_legacy(self, fields):
# With current schema every event is stored under its own unique key
# To query all traces we first need to get all keys, then
# get all events, sort them and pick up only the first one
ids = self.db.scan_iter(match=self.namespace + "*")
traces = [jsonutils.loads(self.db.get(i)) for i in ids]
traces.sort(key=lambda x: x["timestamp"])
seen_ids = set()
result = []
for trace in traces:
if trace["base_id"] not in seen_ids:
seen_ids.add(trace["base_id"])
result.append({key: value for key, value in trace.items()
if key in fields})
return result
def list_error_traces(self):
"""Returns all traces that have error/exception."""
ids = self.db.scan_iter(match=self.namespace_error + "*")
traces = [jsonutils.loads(self.db.get(i)) for i in ids]
traces.sort(key=lambda x: x["timestamp"])
seen_ids = set()
result = []
for trace in traces:
if trace["base_id"] not in seen_ids:
seen_ids.add(trace["base_id"])
result.append(trace)
return result
def get_report(self, base_id):
"""Retrieves and parses notification from Redis.
:param base_id: Base id of trace elements.
"""
def iterate_events():
for key in self.db.scan_iter(
match=self.namespace + base_id + "*"): # legacy
yield self.db.get(key)
for event in self.db.lrange(self.namespace_opt + base_id, 0, -1):
yield event
for data in iterate_events():
n = jsonutils.loads(data)
trace_id = n["trace_id"]
parent_id = n["parent_id"]
name = n["name"]
project = n["project"]
service = n["service"]
host = n["info"]["host"]
timestamp = n["timestamp"]
self._append_results(trace_id, parent_id, name, project, service,
host, timestamp, n)
return self._parse_results()
class RedisSentinel(Redis, base.Driver):
@removals.removed_kwarg("db", message="'db' parameter is deprecated "
"and will be removed in future. "
"Please specify 'db' in "
"'connection_string' instead.")
def __init__(self, connection_str, db=0, project=None,
service=None, host=None, conf=cfg.CONF, **kwargs):
"""Redis driver for OSProfiler."""
super(RedisSentinel, self).__init__(connection_str, project=project,
service=service, host=host,
conf=conf, **kwargs)
try:
from redis.sentinel import Sentinel
except ImportError:
raise exc.CommandError(
"To use this command, you should install "
"'redis' manually. Use command:\n "
"'pip install redis'.")
self.conf = conf
socket_timeout = self.conf.profiler.socket_timeout
parsed_url = parser.urlparse(self.connection_str)
sentinel = Sentinel([(parsed_url.hostname, int(parsed_url.port))],
password=parsed_url.password,
socket_timeout=socket_timeout)
self.db = sentinel.master_for(self.conf.profiler.sentinel_service_name,
socket_timeout=socket_timeout)
@classmethod
def get_name(cls):
return "redissentinel"
|