File: api.py

package info (click to toggle)
python-eventlet 0.26.1-7%2Bdeb11u1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 2,916 kB
  • sloc: python: 24,898; makefile: 98
file content (186 lines) | stat: -rw-r--r-- 5,444 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import os
import sys
import time
import struct
import socket
import random

from eventlet.green import threading
from eventlet.zipkin._thrift.zipkinCore import ttypes
from eventlet.zipkin._thrift.zipkinCore.constants import SERVER_SEND


client = None
_tls = threading.local()  # thread local storage


def put_annotation(msg, endpoint=None):
    """ This is annotation API.
    You can add your own annotation from in your code.
    Annotation is recorded with timestamp automatically.
    e.g.) put_annotation('cache hit for %s' % request)

    :param msg: String message
    :param endpoint: host info
    """
    if is_sample():
        a = ZipkinDataBuilder.build_annotation(msg, endpoint)
        trace_data = get_trace_data()
        trace_data.add_annotation(a)


def put_key_value(key, value, endpoint=None):
    """ This is binary annotation API.
    You can add your own key-value extra information from in your code.
    Key-value doesn't have a time component.
    e.g.) put_key_value('http.uri', '/hoge/index.html')

    :param key: String
    :param value: String
    :param endpoint: host info
    """
    if is_sample():
        b = ZipkinDataBuilder.build_binary_annotation(key, value, endpoint)
        trace_data = get_trace_data()
        trace_data.add_binary_annotation(b)


def is_tracing():
    """ Return whether the current thread is tracking or not """
    return hasattr(_tls, 'trace_data')


def is_sample():
    """ Return whether it should record trace information
        for the request or not
    """
    return is_tracing() and _tls.trace_data.sampled


def get_trace_data():
    if is_tracing():
        return _tls.trace_data


def set_trace_data(trace_data):
    _tls.trace_data = trace_data


def init_trace_data():
    if is_tracing():
        del _tls.trace_data


def _uniq_id():
    """
    Create a random 64-bit signed integer appropriate
    for use as trace and span IDs.
    XXX: By experimentation zipkin has trouble recording traces with ids
    larger than (2 ** 56) - 1
    """
    return random.randint(0, (2 ** 56) - 1)


def generate_trace_id():
    return _uniq_id()


def generate_span_id():
    return _uniq_id()


class TraceData(object):

    END_ANNOTATION = SERVER_SEND

    def __init__(self, name, trace_id, span_id, parent_id, sampled, endpoint):
        """
        :param name: RPC name (String)
        :param trace_id: int
        :param span_id: int
        :param parent_id: int or None
        :param sampled: lets the downstream servers know
                    if I should record trace data for the request (bool)
        :param endpoint: zipkin._thrift.zipkinCore.ttypes.EndPoint
        """
        self.name = name
        self.trace_id = trace_id
        self.span_id = span_id
        self.parent_id = parent_id
        self.sampled = sampled
        self.endpoint = endpoint
        self.annotations = []
        self.bannotations = []
        self._done = False

    def add_annotation(self, annotation):
        if annotation.host is None:
            annotation.host = self.endpoint
        if not self._done:
            self.annotations.append(annotation)
            if annotation.value == self.END_ANNOTATION:
                self.flush()

    def add_binary_annotation(self, bannotation):
        if bannotation.host is None:
            bannotation.host = self.endpoint
        if not self._done:
            self.bannotations.append(bannotation)

    def flush(self):
        span = ZipkinDataBuilder.build_span(name=self.name,
                                            trace_id=self.trace_id,
                                            span_id=self.span_id,
                                            parent_id=self.parent_id,
                                            annotations=self.annotations,
                                            bannotations=self.bannotations)
        client.send_to_collector(span)
        self.annotations = []
        self.bannotations = []
        self._done = True


class ZipkinDataBuilder:
    @staticmethod
    def build_span(name, trace_id, span_id, parent_id,
                   annotations, bannotations):
        return ttypes.Span(
            name=name,
            trace_id=trace_id,
            id=span_id,
            parent_id=parent_id,
            annotations=annotations,
            binary_annotations=bannotations
        )

    @staticmethod
    def build_annotation(value, endpoint=None):
        if isinstance(value, unicode):
            value = value.encode('utf-8')
        return ttypes.Annotation(time.time() * 1000 * 1000,
                                 str(value), endpoint)

    @staticmethod
    def build_binary_annotation(key, value, endpoint=None):
        annotation_type = ttypes.AnnotationType.STRING
        return ttypes.BinaryAnnotation(key, value, annotation_type, endpoint)

    @staticmethod
    def build_endpoint(ipv4=None, port=None, service_name=None):
        if ipv4 is not None:
            ipv4 = ZipkinDataBuilder._ipv4_to_int(ipv4)
        if service_name is None:
            service_name = ZipkinDataBuilder._get_script_name()
        return ttypes.Endpoint(
            ipv4=ipv4,
            port=port,
            service_name=service_name
        )

    @staticmethod
    def _ipv4_to_int(ipv4):
        return struct.unpack('!i', socket.inet_aton(ipv4))[0]

    @staticmethod
    def _get_script_name():
        return os.path.basename(sys.argv[0])