File: __init__.py

package info (click to toggle)
sentry-python 2.18.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,004 kB
  • sloc: python: 55,908; makefile: 114; sh: 111; xml: 2
file content (151 lines) | stat: -rw-r--r-- 4,950 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from functools import wraps

import grpc
from grpc import Channel, Server, intercept_channel
from grpc.aio import Channel as AsyncChannel
from grpc.aio import Server as AsyncServer

from sentry_sdk.integrations import Integration

from .client import ClientInterceptor
from .server import ServerInterceptor
from .aio.server import ServerInterceptor as AsyncServerInterceptor
from .aio.client import (
    SentryUnaryUnaryClientInterceptor as AsyncUnaryUnaryClientInterceptor,
)
from .aio.client import (
    SentryUnaryStreamClientInterceptor as AsyncUnaryStreamClientIntercetor,
)

from typing import TYPE_CHECKING, Any, Optional, Sequence

# Hack to get new Python features working in older versions
# without introducing a hard dependency on `typing_extensions`
# from: https://stackoverflow.com/a/71944042/300572
if TYPE_CHECKING:
    from typing import ParamSpec, Callable
else:
    # Fake ParamSpec
    class ParamSpec:
        def __init__(self, _):
            self.args = None
            self.kwargs = None

    # Callable[anything] will return None
    class _Callable:
        def __getitem__(self, _):
            return None

    # Make instances
    Callable = _Callable()

P = ParamSpec("P")


def _wrap_channel_sync(func: Callable[P, Channel]) -> Callable[P, Channel]:
    "Wrapper for synchronous secure and insecure channel."

    @wraps(func)
    def patched_channel(*args: Any, **kwargs: Any) -> Channel:
        channel = func(*args, **kwargs)
        if not ClientInterceptor._is_intercepted:
            ClientInterceptor._is_intercepted = True
            return intercept_channel(channel, ClientInterceptor())
        else:
            return channel

    return patched_channel


def _wrap_intercept_channel(func: Callable[P, Channel]) -> Callable[P, Channel]:
    @wraps(func)
    def patched_intercept_channel(
        channel: Channel, *interceptors: grpc.ServerInterceptor
    ) -> Channel:
        if ClientInterceptor._is_intercepted:
            interceptors = tuple(
                [
                    interceptor
                    for interceptor in interceptors
                    if not isinstance(interceptor, ClientInterceptor)
                ]
            )
        else:
            interceptors = interceptors
        return intercept_channel(channel, *interceptors)

    return patched_intercept_channel  # type: ignore


def _wrap_channel_async(func: Callable[P, AsyncChannel]) -> Callable[P, AsyncChannel]:
    "Wrapper for asynchronous secure and insecure channel."

    @wraps(func)
    def patched_channel(
        *args: P.args,
        interceptors: Optional[Sequence[grpc.aio.ClientInterceptor]] = None,
        **kwargs: P.kwargs,
    ) -> Channel:
        sentry_interceptors = [
            AsyncUnaryUnaryClientInterceptor(),
            AsyncUnaryStreamClientIntercetor(),
        ]
        interceptors = [*sentry_interceptors, *(interceptors or [])]
        return func(*args, interceptors=interceptors, **kwargs)  # type: ignore

    return patched_channel  # type: ignore


def _wrap_sync_server(func: Callable[P, Server]) -> Callable[P, Server]:
    """Wrapper for synchronous server."""

    @wraps(func)
    def patched_server(
        *args: P.args,
        interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None,
        **kwargs: P.kwargs,
    ) -> Server:
        interceptors = [
            interceptor
            for interceptor in interceptors or []
            if not isinstance(interceptor, ServerInterceptor)
        ]
        server_interceptor = ServerInterceptor()
        interceptors = [server_interceptor, *(interceptors or [])]
        return func(*args, interceptors=interceptors, **kwargs)  # type: ignore

    return patched_server  # type: ignore


def _wrap_async_server(func: Callable[P, AsyncServer]) -> Callable[P, AsyncServer]:
    """Wrapper for asynchronous server."""

    @wraps(func)
    def patched_aio_server(
        *args: P.args,
        interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None,
        **kwargs: P.kwargs,
    ) -> Server:
        server_interceptor = AsyncServerInterceptor()
        interceptors = (server_interceptor, *(interceptors or []))
        return func(*args, interceptors=interceptors, **kwargs)  # type: ignore

    return patched_aio_server  # type: ignore


class GRPCIntegration(Integration):
    identifier = "grpc"

    @staticmethod
    def setup_once() -> None:
        import grpc

        grpc.insecure_channel = _wrap_channel_sync(grpc.insecure_channel)
        grpc.secure_channel = _wrap_channel_sync(grpc.secure_channel)
        grpc.intercept_channel = _wrap_intercept_channel(grpc.intercept_channel)

        grpc.aio.insecure_channel = _wrap_channel_async(grpc.aio.insecure_channel)
        grpc.aio.secure_channel = _wrap_channel_async(grpc.aio.secure_channel)

        grpc.server = _wrap_sync_server(grpc.server)
        grpc.aio.server = _wrap_async_server(grpc.aio.server)