1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
from functools import wraps
import grpc
from grpc import Channel, Server, intercept_channel
from grpc.aio import Channel as AsyncChannel
from grpc.aio import Server as AsyncServer
from sentry_sdk.integrations import Integration
from .client import ClientInterceptor
from .server import ServerInterceptor
from .aio.server import ServerInterceptor as AsyncServerInterceptor
from .aio.client import (
SentryUnaryUnaryClientInterceptor as AsyncUnaryUnaryClientInterceptor,
)
from .aio.client import (
SentryUnaryStreamClientInterceptor as AsyncUnaryStreamClientIntercetor,
)
from typing import TYPE_CHECKING, Any, Optional, Sequence
# Hack to get new Python features working in older versions
# without introducing a hard dependency on `typing_extensions`
# from: https://stackoverflow.com/a/71944042/300572
if TYPE_CHECKING:
from typing import ParamSpec, Callable
else:
# Fake ParamSpec
class ParamSpec:
def __init__(self, _):
self.args = None
self.kwargs = None
# Callable[anything] will return None
class _Callable:
def __getitem__(self, _):
return None
# Make instances
Callable = _Callable()
P = ParamSpec("P")
def _wrap_channel_sync(func: Callable[P, Channel]) -> Callable[P, Channel]:
"Wrapper for synchronous secure and insecure channel."
@wraps(func)
def patched_channel(*args: Any, **kwargs: Any) -> Channel:
channel = func(*args, **kwargs)
if not ClientInterceptor._is_intercepted:
ClientInterceptor._is_intercepted = True
return intercept_channel(channel, ClientInterceptor())
else:
return channel
return patched_channel
def _wrap_intercept_channel(func: Callable[P, Channel]) -> Callable[P, Channel]:
@wraps(func)
def patched_intercept_channel(
channel: Channel, *interceptors: grpc.ServerInterceptor
) -> Channel:
if ClientInterceptor._is_intercepted:
interceptors = tuple(
[
interceptor
for interceptor in interceptors
if not isinstance(interceptor, ClientInterceptor)
]
)
else:
interceptors = interceptors
return intercept_channel(channel, *interceptors)
return patched_intercept_channel # type: ignore
def _wrap_channel_async(func: Callable[P, AsyncChannel]) -> Callable[P, AsyncChannel]:
"Wrapper for asynchronous secure and insecure channel."
@wraps(func)
def patched_channel(
*args: P.args,
interceptors: Optional[Sequence[grpc.aio.ClientInterceptor]] = None,
**kwargs: P.kwargs,
) -> Channel:
sentry_interceptors = [
AsyncUnaryUnaryClientInterceptor(),
AsyncUnaryStreamClientIntercetor(),
]
interceptors = [*sentry_interceptors, *(interceptors or [])]
return func(*args, interceptors=interceptors, **kwargs) # type: ignore
return patched_channel # type: ignore
def _wrap_sync_server(func: Callable[P, Server]) -> Callable[P, Server]:
"""Wrapper for synchronous server."""
@wraps(func)
def patched_server(
*args: P.args,
interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None,
**kwargs: P.kwargs,
) -> Server:
interceptors = [
interceptor
for interceptor in interceptors or []
if not isinstance(interceptor, ServerInterceptor)
]
server_interceptor = ServerInterceptor()
interceptors = [server_interceptor, *(interceptors or [])]
return func(*args, interceptors=interceptors, **kwargs) # type: ignore
return patched_server # type: ignore
def _wrap_async_server(func: Callable[P, AsyncServer]) -> Callable[P, AsyncServer]:
"""Wrapper for asynchronous server."""
@wraps(func)
def patched_aio_server(
*args: P.args,
interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None,
**kwargs: P.kwargs,
) -> Server:
server_interceptor = AsyncServerInterceptor()
interceptors = (server_interceptor, *(interceptors or []))
return func(*args, interceptors=interceptors, **kwargs) # type: ignore
return patched_aio_server # type: ignore
class GRPCIntegration(Integration):
identifier = "grpc"
@staticmethod
def setup_once() -> None:
import grpc
grpc.insecure_channel = _wrap_channel_sync(grpc.insecure_channel)
grpc.secure_channel = _wrap_channel_sync(grpc.secure_channel)
grpc.intercept_channel = _wrap_intercept_channel(grpc.intercept_channel)
grpc.aio.insecure_channel = _wrap_channel_async(grpc.aio.insecure_channel)
grpc.aio.secure_channel = _wrap_channel_async(grpc.aio.secure_channel)
grpc.server = _wrap_sync_server(grpc.server)
grpc.aio.server = _wrap_async_server(grpc.aio.server)
|