File: _client.py

package info (click to toggle)
golang-github-grpc-ecosystem-grpc-opentracing 0.0~git20180507.8e809c8-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bookworm-proposed-updates, bullseye
  • size: 576 kB
  • sloc: python: 2,021; java: 1,077; makefile: 2
file content (209 lines) | stat: -rw-r--r-- 8,640 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
"""Implementation of the invocation-side open-tracing interceptor."""

import sys
import logging
import time

from six import iteritems

import grpc
from grpc_opentracing import grpcext
from grpc_opentracing._utilities import get_method_type, get_deadline_millis,\
    log_or_wrap_request_or_iterator, RpcInfo
import opentracing
from opentracing.ext import tags as ot_tags


class _GuardedSpan(object):

    def __init__(self, span):
        self.span = span
        self._engaged = True

    def __enter__(self):
        self.span.__enter__()
        return self

    def __exit__(self, *args, **kwargs):
        if self._engaged:
            return self.span.__exit__(*args, **kwargs)
        else:
            return False

    def release(self):
        self._engaged = False
        return self.span


def _inject_span_context(tracer, span, metadata):
    headers = {}
    try:
        tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, headers)
    except (opentracing.UnsupportedFormatException,
            opentracing.InvalidCarrierException,
            opentracing.SpanContextCorruptedException) as e:
        logging.exception('tracer.inject() failed')
        span.log_kv({'event': 'error', 'error.object': e})
        return metadata
    metadata = () if metadata is None else tuple(metadata)
    return metadata + tuple(iteritems(headers))


def _make_future_done_callback(span, rpc_info, log_payloads, span_decorator):

    def callback(response_future):
        with span:
            code = response_future.code()
            if code != grpc.StatusCode.OK:
                span.set_tag('error', True)
                error_log = {'event': 'error', 'error.kind': str(code)}
                details = response_future.details()
                if details is not None:
                    error_log['message'] = details
                span.log_kv(error_log)
                rpc_info.error = code
                if span_decorator is not None:
                    span_decorator(span, rpc_info)
                return
            response = response_future.result()
            rpc_info.response = response
            if log_payloads:
                span.log_kv({'response': response})
            if span_decorator is not None:
                span_decorator(span, rpc_info)

    return callback


class OpenTracingClientInterceptor(grpcext.UnaryClientInterceptor,
                                   grpcext.StreamClientInterceptor):

    def __init__(self, tracer, active_span_source, log_payloads,
                 span_decorator):
        self._tracer = tracer
        self._active_span_source = active_span_source
        self._log_payloads = log_payloads
        self._span_decorator = span_decorator

    def _start_span(self, method):
        active_span_context = None
        if self._active_span_source is not None:
            active_span = self._active_span_source.get_active_span()
            if active_span is not None:
                active_span_context = active_span.context
        tags = {
            ot_tags.COMPONENT: 'grpc',
            ot_tags.SPAN_KIND: ot_tags.SPAN_KIND_RPC_CLIENT
        }
        return self._tracer.start_span(
            operation_name=method, child_of=active_span_context, tags=tags)

    def _trace_result(self, guarded_span, rpc_info, result):
        # If the RPC is called asynchronously, release the guard and add a callback
        # so that the span can be finished once the future is done.
        if isinstance(result, grpc.Future):
            result.add_done_callback(
                _make_future_done_callback(guarded_span.release(
                ), rpc_info, self._log_payloads, self._span_decorator))
            return result
        response = result
        # Handle the case when the RPC is initiated via the with_call
        # method and the result is a tuple with the first element as the
        # response.
        # http://www.grpc.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.with_call
        if isinstance(result, tuple):
            response = result[0]
        rpc_info.response = response
        if self._log_payloads:
            guarded_span.span.log_kv({'response': response})
        if self._span_decorator is not None:
            self._span_decorator(guarded_span.span, rpc_info)
        return result

    def _start_guarded_span(self, *args, **kwargs):
        return _GuardedSpan(self._start_span(*args, **kwargs))

    def intercept_unary(self, request, metadata, client_info, invoker):
        with self._start_guarded_span(client_info.full_method) as guarded_span:
            metadata = _inject_span_context(self._tracer, guarded_span.span,
                                            metadata)
            rpc_info = RpcInfo(
                full_method=client_info.full_method,
                metadata=metadata,
                timeout=client_info.timeout,
                request=request)
            if self._log_payloads:
                guarded_span.span.log_kv({'request': request})
            try:
                result = invoker(request, metadata)
            except:
                e = sys.exc_info()[0]
                guarded_span.span.set_tag('error', True)
                guarded_span.span.log_kv({'event': 'error', 'error.object': e})
                rpc_info.error = e
                if self._span_decorator is not None:
                    self._span_decorator(guarded_span.span, rpc_info)
                raise
            return self._trace_result(guarded_span, rpc_info, result)

    # For RPCs that stream responses, the result can be a generator. To record
    # the span across the generated responses and detect any errors, we wrap the
    # result in a new generator that yields the response values.
    def _intercept_server_stream(self, request_or_iterator, metadata,
                                 client_info, invoker):
        with self._start_span(client_info.full_method) as span:
            metadata = _inject_span_context(self._tracer, span, metadata)
            rpc_info = RpcInfo(
                full_method=client_info.full_method,
                metadata=metadata,
                timeout=client_info.timeout)
            if client_info.is_client_stream:
                rpc_info.request = request_or_iterator
            if self._log_payloads:
                request_or_iterator = log_or_wrap_request_or_iterator(
                    span, client_info.is_client_stream, request_or_iterator)
            try:
                result = invoker(request_or_iterator, metadata)
                for response in result:
                    if self._log_payloads:
                        span.log_kv({'response': response})
                    yield response
            except:
                e = sys.exc_info()[0]
                span.set_tag('error', True)
                span.log_kv({'event': 'error', 'error.object': e})
                rpc_info.error = e
                if self._span_decorator is not None:
                    self._span_decorator(span, rpc_info)
                raise
            if self._span_decorator is not None:
                self._span_decorator(span, rpc_info)

    def intercept_stream(self, request_or_iterator, metadata, client_info,
                         invoker):
        if client_info.is_server_stream:
            return self._intercept_server_stream(request_or_iterator, metadata,
                                                 client_info, invoker)
        with self._start_guarded_span(client_info.full_method) as guarded_span:
            metadata = _inject_span_context(self._tracer, guarded_span.span,
                                            metadata)
            rpc_info = RpcInfo(
                full_method=client_info.full_method,
                metadata=metadata,
                timeout=client_info.timeout,
                request=request_or_iterator)
            if self._log_payloads:
                request_or_iterator = log_or_wrap_request_or_iterator(
                    guarded_span.span, client_info.is_client_stream,
                    request_or_iterator)
            try:
                result = invoker(request_or_iterator, metadata)
            except:
                e = sys.exc_info()[0]
                guarded_span.span.set_tag('error', True)
                guarded_span.span.log_kv({'event': 'error', 'error.object': e})
                rpc_info.error = e
                if self._span_decorator is not None:
                    self._span_decorator(guarded_span.span, rpc_info)
                raise
            return self._trace_result(guarded_span, rpc_info, result)