File: logging_helper.py

package info (click to toggle)
python-py-zipkin 1.2.8-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 300 kB
  • sloc: python: 1,556; makefile: 3
file content (224 lines) | stat: -rw-r--r-- 8,080 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import os
import time
from types import TracebackType
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Type
from typing import Union

from py_zipkin import Kind
from py_zipkin.encoding._encoders import get_encoder
from py_zipkin.encoding._encoders import IEncoder
from py_zipkin.encoding._helpers import copy_endpoint_with_new_service_name
from py_zipkin.encoding._helpers import Endpoint
from py_zipkin.encoding._helpers import Span
from py_zipkin.encoding._types import Encoding
from py_zipkin.exception import ZipkinError
from py_zipkin.storage import Tracer
from py_zipkin.transport import BaseTransportHandler
from py_zipkin.util import ZipkinAttrs


LOGGING_END_KEY = "py_zipkin.logging_end"


TransportHandler = Union[BaseTransportHandler, Callable[[Union[str, bytes]], None]]


class ZipkinLoggingContext:
    """A logging context specific to a Zipkin trace. If the trace is sampled,
    the logging context sends serialized Zipkin spans to a transport_handler.
    The logging context sends root "server" or "client" span, as well as all
    local child spans collected within this context.

    This class should only be used by the main `zipkin_span` entrypoint.
    """

    def __init__(
        self,
        zipkin_attrs: ZipkinAttrs,
        endpoint: Endpoint,
        span_name: str,
        transport_handler: Optional[TransportHandler],
        report_root_timestamp: float,
        get_tracer: Callable[[], Tracer],
        service_name: str,
        binary_annotations: Optional[Dict[str, Optional[str]]] = None,
        add_logging_annotation: bool = False,
        client_context: bool = False,
        max_span_batch_size: Optional[int] = None,
        firehose_handler: Optional[TransportHandler] = None,
        encoding: Optional[Encoding] = None,
        annotations: Optional[Dict[str, Optional[float]]] = None,
    ):
        self.zipkin_attrs = zipkin_attrs
        self.endpoint = endpoint
        self.span_name = span_name
        self.transport_handler = transport_handler
        self.response_status_code = 0
        self._get_tracer = get_tracer
        self.service_name = service_name
        self.report_root_timestamp = report_root_timestamp
        self.tags = binary_annotations or {}
        self.add_logging_annotation = add_logging_annotation
        self.client_context = client_context
        self.max_span_batch_size = max_span_batch_size
        self.firehose_handler = firehose_handler
        self.annotations = annotations or {}

        self.remote_endpoint: Optional[Endpoint] = None
        assert encoding is not None
        self.encoder = get_encoder(encoding)

    def start(self) -> "ZipkinLoggingContext":
        """Actions to be taken before request is handled."""

        # Record the start timestamp.
        self.start_timestamp = time.time()
        return self

    def stop(self) -> None:
        """Actions to be taken post request handling."""

        self.emit_spans()

    def emit_spans(self) -> None:
        """Main function to log all the annotations stored during the entire
        request. This is done if the request is sampled and the response was
        a success. It also logs the service (`ss` and `sr`) or the client
        ('cs' and 'cr') annotations.
        """

        # FIXME: Should have a single aggregate handler
        if self.firehose_handler:
            # FIXME: We need to allow different batching settings per handler
            self._emit_spans_with_span_sender(
                ZipkinBatchSender(
                    self.firehose_handler, self.max_span_batch_size, self.encoder
                )
            )

        if not self.zipkin_attrs.is_sampled:
            self._get_tracer().clear()
            return

        span_sender = ZipkinBatchSender(
            self.transport_handler, self.max_span_batch_size, self.encoder
        )

        self._emit_spans_with_span_sender(span_sender)
        self._get_tracer().clear()

    def _emit_spans_with_span_sender(self, span_sender: "ZipkinBatchSender") -> None:
        with span_sender:
            end_timestamp = time.time()

            # Collect, annotate, and log client spans from the logging handler
            for span in self._get_tracer()._span_storage:
                assert span.local_endpoint is not None
                span.local_endpoint = copy_endpoint_with_new_service_name(
                    self.endpoint,
                    span.local_endpoint.service_name,
                )

                span_sender.add_span(span)

            if self.add_logging_annotation:
                self.annotations[LOGGING_END_KEY] = time.time()

            span_sender.add_span(
                Span(
                    trace_id=self.zipkin_attrs.trace_id,
                    name=self.span_name,
                    parent_id=self.zipkin_attrs.parent_span_id,
                    span_id=self.zipkin_attrs.span_id,
                    kind=Kind.CLIENT if self.client_context else Kind.SERVER,
                    timestamp=self.start_timestamp,
                    duration=end_timestamp - self.start_timestamp,
                    local_endpoint=self.endpoint,
                    remote_endpoint=self.remote_endpoint,
                    shared=not self.report_root_timestamp,
                    annotations=self.annotations,
                    tags=self.tags,
                )
            )


class ZipkinBatchSender:

    MAX_PORTION_SIZE = 100

    def __init__(
        self,
        transport_handler: Optional[TransportHandler],
        max_portion_size: Optional[int],
        encoder: IEncoder,
    ) -> None:
        self.transport_handler = transport_handler
        self.max_portion_size = max_portion_size or self.MAX_PORTION_SIZE
        self.encoder = encoder

        if isinstance(self.transport_handler, BaseTransportHandler):
            self.max_payload_bytes = self.transport_handler.get_max_payload_bytes()
        else:
            self.max_payload_bytes = None

    def __enter__(self) -> "ZipkinBatchSender":
        self._reset_queue()
        return self

    def __exit__(
        self,
        _exc_type: Optional[Type[BaseException]],
        _exc_value: Optional[BaseException],
        _exc_traceback: Optional[TracebackType],
    ) -> None:
        if any((_exc_type, _exc_value, _exc_traceback)):
            assert _exc_type is not None
            assert _exc_value is not None
            assert _exc_traceback is not None
            filename = os.path.split(_exc_traceback.tb_frame.f_code.co_filename)[1]
            error = "({}:{}) {}: {}".format(
                filename,
                _exc_traceback.tb_lineno,
                _exc_type.__name__,
                _exc_value,
            )
            raise ZipkinError(error)
        else:
            self.flush()

    def _reset_queue(self) -> None:
        self.queue: List[Union[str, bytes]] = []
        self.current_size = 0

    def add_span(self, internal_span: Span) -> None:
        encoded_span = self.encoder.encode_span(internal_span)

        # If we've already reached the max batch size or the new span doesn't
        # fit in max_payload_bytes, send what we've collected until now and
        # start a new batch.
        is_over_size_limit = (
            self.max_payload_bytes is not None
            and not self.encoder.fits(
                current_count=len(self.queue),
                current_size=self.current_size,
                max_size=self.max_payload_bytes,
                new_span=encoded_span,
            )
        )
        is_over_portion_limit = len(self.queue) >= self.max_portion_size
        if is_over_size_limit or is_over_portion_limit:
            self.flush()

        self.queue.append(encoded_span)
        self.current_size += len(encoded_span)

    def flush(self) -> None:
        if self.transport_handler and len(self.queue) > 0:

            message = self.encoder.encode_queue(self.queue)
            self.transport_handler(message)
        self._reset_queue()