File: lambda_launcher.py

package info (click to toggle)
python-aws-xray-sdk 0.95-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 792 kB
  • sloc: python: 3,006; makefile: 20
file content (138 lines) | stat: -rw-r--r-- 4,286 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
import logging
import threading

from .models.facade_segment import FacadeSegment
from .models.trace_header import TraceHeader
from .context import Context


log = logging.getLogger(__name__)


LAMBDA_TRACE_HEADER_KEY = '_X_AMZN_TRACE_ID'
LAMBDA_TASK_ROOT_KEY = 'LAMBDA_TASK_ROOT'
TOUCH_FILE_DIR = '/tmp/.aws-xray/'
TOUCH_FILE_PATH = '/tmp/.aws-xray/initialized'


def check_in_lambda():
    """
    Return None if SDK is not loaded in AWS Lambda worker.
    Otherwise drop a touch file and return a lambda context.
    """
    if not os.getenv(LAMBDA_TASK_ROOT_KEY):
        return None

    try:
        os.mkdir(TOUCH_FILE_DIR)
    except OSError:
        log.debug('directory %s already exists', TOUCH_FILE_DIR)

    try:
        f = open(TOUCH_FILE_PATH, 'w+')
        f.close()
        # utime force second parameter in python2.7
        os.utime(TOUCH_FILE_PATH, None)
    except (IOError, OSError):
        log.warning("Unable to write to %s. Failed to signal SDK initialization." % TOUCH_FILE_PATH)

    return LambdaContext()


class LambdaContext(Context):
    """
    Lambda service will generate a segment for each function invocation which
    cannot be mutated. The context doesn't keep any manually created segment
    but instead every time ``get_trace_entity()`` gets called it refresh the facade
    segment based on environment variables set by Lambda worker.
    """
    def __init__(self):

        self._local = threading.local()

    def put_segment(self, segment):
        """
        No-op.
        """
        log.warning('Cannot create segments inside Lambda function. Discarded.')

    def end_segment(self, end_time=None):
        """
        No-op.
        """
        log.warning('Cannot end segment inside Lambda function. Ignored.')

    def put_subsegment(self, subsegment):
        """
        Refresh the facade segment every time this function is invoked to prevent
        a new subsegment from being attached to a leaked segment/subsegment.
        """
        current_entity = self.get_trace_entity()

        if not self._is_subsegment(current_entity) and current_entity.initializing:
            log.warning("Subsegment %s discarded due to Lambda worker still initializing" % subsegment.name)

        current_entity.add_subsegment(subsegment)
        self._local.entities.append(subsegment)

    def get_trace_entity(self):
        self._refresh_context()
        if getattr(self._local, 'entities', None):
            return self._local.entities[-1]
        else:
            return self._local.segment

    def _refresh_context(self):
        """
        Get current facade segment. To prevent resource leaking in Lambda worker,
        every time there is segment present, we compare its trace id to current
        environment variables. If it is different we create a new facade segment
        and clean up subsegments stored.
        """
        header_str = os.getenv(LAMBDA_TRACE_HEADER_KEY)
        trace_header = TraceHeader.from_header_str(header_str)
        segment = getattr(self._local, 'segment', None)

        if segment:
            # Ensure customers don't have leaked subsegments across invocations
            if not trace_header.root or trace_header.root == segment.trace_id:
                return
            else:
                self._initialize_context(trace_header)
        else:
            self._initialize_context(trace_header)

    @property
    def context_missing(self):
        return None

    @context_missing.setter
    def context_missing(self, value):
        pass

    def handle_context_missing(self):
        """
        No-op.
        """
        pass

    def _initialize_context(self, trace_header):
        """
        Create a facade segment based on environment variables
        set by AWS Lambda and initialize storage for subsegments.
        """
        sampled = None
        if trace_header.sampled == 0:
            sampled = False
        elif trace_header.sampled == 1:
            sampled = True

        segment = FacadeSegment(
            name='facade',
            traceid=trace_header.root,
            entityid=trace_header.parent,
            sampled=sampled,
        )
        setattr(self._local, 'segment', segment)
        setattr(self._local, 'entities', [])