1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
import os
import logging
import threading
from .models.facade_segment import FacadeSegment
from .models.trace_header import TraceHeader
from .context import Context
log = logging.getLogger(__name__)
LAMBDA_TRACE_HEADER_KEY = '_X_AMZN_TRACE_ID'
LAMBDA_TASK_ROOT_KEY = 'LAMBDA_TASK_ROOT'
TOUCH_FILE_DIR = '/tmp/.aws-xray/'
TOUCH_FILE_PATH = '/tmp/.aws-xray/initialized'
def check_in_lambda():
"""
Return None if SDK is not loaded in AWS Lambda worker.
Otherwise drop a touch file and return a lambda context.
"""
if not os.getenv(LAMBDA_TASK_ROOT_KEY):
return None
try:
os.mkdir(TOUCH_FILE_DIR)
except OSError:
log.debug('directory %s already exists', TOUCH_FILE_DIR)
try:
f = open(TOUCH_FILE_PATH, 'w+')
f.close()
# utime force second parameter in python2.7
os.utime(TOUCH_FILE_PATH, None)
except (IOError, OSError):
log.warning("Unable to write to %s. Failed to signal SDK initialization." % TOUCH_FILE_PATH)
return LambdaContext()
class LambdaContext(Context):
"""
Lambda service will generate a segment for each function invocation which
cannot be mutated. The context doesn't keep any manually created segment
but instead every time ``get_trace_entity()`` gets called it refresh the facade
segment based on environment variables set by Lambda worker.
"""
def __init__(self):
self._local = threading.local()
def put_segment(self, segment):
"""
No-op.
"""
log.warning('Cannot create segments inside Lambda function. Discarded.')
def end_segment(self, end_time=None):
"""
No-op.
"""
log.warning('Cannot end segment inside Lambda function. Ignored.')
def put_subsegment(self, subsegment):
"""
Refresh the facade segment every time this function is invoked to prevent
a new subsegment from being attached to a leaked segment/subsegment.
"""
current_entity = self.get_trace_entity()
if not self._is_subsegment(current_entity) and current_entity.initializing:
log.warning("Subsegment %s discarded due to Lambda worker still initializing" % subsegment.name)
current_entity.add_subsegment(subsegment)
self._local.entities.append(subsegment)
def get_trace_entity(self):
self._refresh_context()
if getattr(self._local, 'entities', None):
return self._local.entities[-1]
else:
return self._local.segment
def _refresh_context(self):
"""
Get current facade segment. To prevent resource leaking in Lambda worker,
every time there is segment present, we compare its trace id to current
environment variables. If it is different we create a new facade segment
and clean up subsegments stored.
"""
header_str = os.getenv(LAMBDA_TRACE_HEADER_KEY)
trace_header = TraceHeader.from_header_str(header_str)
segment = getattr(self._local, 'segment', None)
if segment:
# Ensure customers don't have leaked subsegments across invocations
if not trace_header.root or trace_header.root == segment.trace_id:
return
else:
self._initialize_context(trace_header)
else:
self._initialize_context(trace_header)
@property
def context_missing(self):
return None
@context_missing.setter
def context_missing(self, value):
pass
def handle_context_missing(self):
"""
No-op.
"""
pass
def _initialize_context(self, trace_header):
"""
Create a facade segment based on environment variables
set by AWS Lambda and initialize storage for subsegments.
"""
sampled = None
if trace_header.sampled == 0:
sampled = False
elif trace_header.sampled == 1:
sampled = True
segment = FacadeSegment(
name='facade',
traceid=trace_header.root,
entityid=trace_header.parent,
sampled=sampled,
)
setattr(self._local, 'segment', segment)
setattr(self._local, 'entities', [])
|