1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
|
import logging
import os
import binascii
import time
import string
import jsonpickle
from ..utils.compat import annotation_value_types, string_types
from .throwable import Throwable
from . import http
from ..exceptions.exceptions import AlreadyEndedException
log = logging.getLogger(__name__)
# List of valid characters found at http://docs.aws.amazon.com/xray/latest/devguide/xray-api-segmentdocuments.html
_valid_name_characters = string.ascii_letters + string.digits + r'_.:/%&#=+\-@ '
class Entity(object):
"""
The parent class for segment/subsegment. It holds common properties
and methods on segment and subsegment.
"""
def __init__(self, name):
# required attributes
self.id = self._generate_random_id()
self.name = name
self.name = ''.join([c for c in name if c in _valid_name_characters])
self.start_time = time.time()
self.parent_id = None
if self.name != name:
log.warning("Removing Segment/Subsugment Name invalid characters.")
# sampling
self.sampled = True
# state
self.in_progress = True
# meta fields
self.http = {}
self.annotations = {}
self.metadata = {}
self.aws = {}
self.cause = {}
# child subsegments
# list is thread-safe
self.subsegments = []
self.user = None
def close(self, end_time=None):
"""
Close the trace entity by setting `end_time`
and flip the in progress flag to False.
:param int end_time: Epoch in seconds. If not specified
current time will be used.
"""
self._check_ended()
if end_time:
self.end_time = end_time
else:
self.end_time = time.time()
self.in_progress = False
def add_subsegment(self, subsegment):
"""
Add input subsegment as a child subsegment.
"""
self._check_ended()
subsegment.parent_id = self.id
self.subsegments.append(subsegment)
def remove_subsegment(self, subsegment):
"""
Remove input subsegment from child subsegments.
"""
self._check_ended()
self.subsegments.remove(subsegment)
def put_http_meta(self, key, value):
"""
Add http related metadata.
:param str key: Currently supported keys are:
* url
* method
* user_agent
* client_ip
* status
* content_length
:param value: status and content_length are int and for other
supported keys string should be used.
"""
self._check_ended()
if value is None:
return
if key == http.STATUS:
if isinstance(value, string_types):
value = int(value)
self.apply_status_code(value)
if key in http.request_keys:
if 'request' not in self.http:
self.http['request'] = {}
self.http['request'][key] = value
elif key in http.response_keys:
if 'response' not in self.http:
self.http['response'] = {}
self.http['response'][key] = value
else:
log.warning("ignoring unsupported key %s in http meta.", key)
def put_annotation(self, key, value):
"""
Annotate segment or subsegment with a key-value pair.
Annotations will be indexed for later search query.
:param str key: annotation key
:param object value: annotation value. Any type other than
string/number/bool will be dropped
"""
self._check_ended()
if not isinstance(key, string_types):
log.warning("ignoring non string type annotation key with type %s.", type(key))
return
if not isinstance(value, annotation_value_types):
log.warning("ignoring unsupported annotation value type %s.", type(value))
return
self.annotations[key] = value
def put_metadata(self, key, value, namespace='default'):
"""
Add metadata to segment or subsegment. Metadata is not indexed
but can be later retrieved by BatchGetTraces API.
:param str namespace: optional. Default namespace is `default`.
It must be a string and prefix `AWS.` is reserved.
:param str key: metadata key under specified namespace
:param object value: any object that can be serialized into JSON string
"""
self._check_ended()
if not isinstance(namespace, string_types):
log.warning("ignoring non string type metadata namespace")
return
if namespace.startswith('AWS.'):
log.warning("Prefix 'AWS.' is reserved, drop metadata with namespace %s", namespace)
return
if self.metadata.get(namespace, None):
self.metadata[namespace][key] = value
else:
self.metadata[namespace] = {key: value}
def set_aws(self, aws_meta):
"""
set aws section of the entity.
This method is called by global recorder and botocore patcher
to provide additonal information about AWS runtime.
It is not recommended to manually set aws section.
"""
self._check_ended()
self.aws = aws_meta
def set_user(self, user):
"""
set user of an segment or subsegment.
one segment or subsegment can only hold one user.
User is indexed and can be later queried.
"""
self._check_ended()
self.user = user
def add_throttle_flag(self):
self.throttle = True
def add_fault_flag(self):
self.fault = True
def add_error_flag(self):
self.error = True
def apply_status_code(self, status_code):
"""
When a trace entity is generated under the http context,
the status code will affect this entity's fault/error/throttle flags.
Flip these flags based on status code.
"""
self._check_ended()
if not status_code:
return
if status_code >= 500:
self.add_fault_flag()
elif status_code == 429:
self.add_throttle_flag()
self.add_error_flag()
elif status_code >= 400:
self.add_error_flag()
def add_exception(self, exception, stack, remote=False):
"""
Add an exception to trace entities.
:param Exception exception: the catched exception.
:param list stack: the output from python built-in
`traceback.extract_stack()`.
:param bool remote: If False it means it's a client error
instead of a downstream service.
"""
self._check_ended()
self.add_fault_flag()
if hasattr(exception, '_recorded'):
setattr(self, 'cause', getattr(exception, '_cause_id'))
return
exceptions = []
exceptions.append(Throwable(exception, stack, remote))
self.cause['exceptions'] = exceptions
self.cause['working_directory'] = os.getcwd()
def serialize(self):
"""
Serialize to JSON document that can be accepted by the
X-Ray backend service. It uses jsonpickle to perform
serialization.
"""
try:
return jsonpickle.encode(self, unpicklable=False)
except Exception:
log.exception("got an exception during serialization")
def _delete_empty_properties(self, properties):
"""
Delete empty properties before serialization to avoid
extra keys with empty values in the output json.
"""
if not self.parent_id:
del properties['parent_id']
if not self.subsegments:
del properties['subsegments']
if not self.aws:
del properties['aws']
if not self.http:
del properties['http']
if not self.cause:
del properties['cause']
if not self.annotations:
del properties['annotations']
if not self.metadata:
del properties['metadata']
if not self.user:
del properties['user']
del properties['sampled']
def _check_ended(self):
if not self.in_progress:
raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")
def _generate_random_id(self):
"""
Generate a random 16-digit hex str.
This is used for generating segment/subsegment id.
"""
return binascii.b2a_hex(os.urandom(8)).decode('utf-8')
|