File: test_interceptor.py

package info (click to toggle)
golang-github-grpc-ecosystem-grpc-opentracing 0.0~git20180507.8e809c8-3
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 592 kB
  • sloc: python: 2,021; java: 1,077; makefile: 2
file content (168 lines) | stat: -rw-r--r-- 7,006 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import unittest

import grpc
from grpc_opentracing import grpcext

from _service import Service


class ClientInterceptor(grpcext.UnaryClientInterceptor,
                        grpcext.StreamClientInterceptor):

    def __init__(self):
        self.intercepted = False

    def intercept_unary(self, request, metadata, client_info, invoker):
        self.intercepted = True
        return invoker(request, metadata)

    def intercept_stream(self, request_or_iterator, metadata, client_info,
                         invoker):
        self.intercepted = True
        return invoker(request_or_iterator, metadata)


class ServerInterceptor(grpcext.UnaryServerInterceptor,
                        grpcext.StreamServerInterceptor):

    def __init__(self):
        self.intercepted = False

    def intercept_unary(self, request, servicer_context, server_info, handler):
        self.intercepted = True
        return handler(request, servicer_context)

    def intercept_stream(self, request_or_iterator, servicer_context,
                         server_info, handler):
        self.intercepted = True
        return handler(request_or_iterator, servicer_context)


class InterceptorTest(unittest.TestCase):
    """Test that RPC calls are intercepted."""

    def setUp(self):
        self._client_interceptor = ClientInterceptor()
        self._server_interceptor = ServerInterceptor()
        self._service = Service([self._client_interceptor],
                                [self._server_interceptor])

    def testUnaryUnaryInterception(self):
        multi_callable = self._service.unary_unary_multi_callable
        request = b'\x01'
        expected_response = self._service.handler.handle_unary_unary(request,
                                                                     None)
        response = multi_callable(request)

        self.assertEqual(response, expected_response)
        self.assertTrue(self._client_interceptor.intercepted)
        self.assertTrue(self._server_interceptor.intercepted)

    def testUnaryUnaryInterceptionWithCall(self):
        multi_callable = self._service.unary_unary_multi_callable
        request = b'\x01'
        expected_response = self._service.handler.handle_unary_unary(request,
                                                                     None)
        response, call = multi_callable.with_call(request)

        self.assertEqual(response, expected_response)
        self.assertIs(grpc.StatusCode.OK, call.code())
        self.assertTrue(self._client_interceptor.intercepted)
        self.assertTrue(self._server_interceptor.intercepted)

    def testUnaryUnaryInterceptionFuture(self):
        multi_callable = self._service.unary_unary_multi_callable
        request = b'\x01'
        expected_response = self._service.handler.handle_unary_unary(request,
                                                                     None)
        response = multi_callable.future(request).result()

        self.assertEqual(response, expected_response)
        self.assertTrue(self._client_interceptor.intercepted)
        self.assertTrue(self._server_interceptor.intercepted)

    def testUnaryStreamInterception(self):
        multi_callable = self._service.unary_stream_multi_callable
        request = b'\x01'
        expected_response = self._service.handler.handle_unary_stream(request,
                                                                      None)
        response = multi_callable(request)

        self.assertEqual(list(response), list(expected_response))
        self.assertTrue(self._client_interceptor.intercepted)
        self.assertTrue(self._server_interceptor.intercepted)

    def testStreamUnaryInterception(self):
        multi_callable = self._service.stream_unary_multi_callable
        requests = [b'\x01', b'\x02']
        expected_response = self._service.handler.handle_stream_unary(
            iter(requests), None)
        response = multi_callable(iter(requests))

        self.assertEqual(response, expected_response)
        self.assertTrue(self._client_interceptor.intercepted)
        self.assertTrue(self._server_interceptor.intercepted)

    def testStreamUnaryInterceptionWithCall(self):
        multi_callable = self._service.stream_unary_multi_callable
        requests = [b'\x01', b'\x02']
        expected_response = self._service.handler.handle_stream_unary(
            iter(requests), None)
        response, call = multi_callable.with_call(iter(requests))

        self.assertEqual(response, expected_response)
        self.assertIs(grpc.StatusCode.OK, call.code())
        self.assertTrue(self._client_interceptor.intercepted)
        self.assertTrue(self._server_interceptor.intercepted)

    def testStreamUnaryInterceptionFuture(self):
        multi_callable = self._service.stream_unary_multi_callable
        requests = [b'\x01', b'\x02']
        expected_response = self._service.handler.handle_stream_unary(
            iter(requests), None)
        response = multi_callable.future(iter(requests)).result()

        self.assertEqual(response, expected_response)
        self.assertTrue(self._client_interceptor.intercepted)
        self.assertTrue(self._server_interceptor.intercepted)

    def testStreamStreamInterception(self):
        multi_callable = self._service.stream_stream_multi_callable
        requests = [b'\x01', b'\x02']
        expected_response = self._service.handler.handle_stream_stream(
            iter(requests), None)
        response = multi_callable(iter(requests))

        self.assertEqual(list(response), list(expected_response))
        self.assertTrue(self._client_interceptor.intercepted)
        self.assertTrue(self._server_interceptor.intercepted)


class MultiInterceptorTest(unittest.TestCase):
    """Test that you can chain multiple interceptors together."""

    def setUp(self):
        self._client_interceptors = [ClientInterceptor(), ClientInterceptor()]
        self._server_interceptors = [ServerInterceptor(), ServerInterceptor()]
        self._service = Service(self._client_interceptors,
                                self._server_interceptors)

    def _clear(self):
        for client_interceptor in self._client_interceptors:
            client_interceptor.intercepted = False

        for server_interceptor in self._server_interceptors:
            server_interceptor.intercepted = False

    def testUnaryUnaryMultiInterception(self):
        multi_callable = self._service.unary_unary_multi_callable
        request = b'\x01'
        expected_response = self._service.handler.handle_unary_unary(request,
                                                                     None)
        response = multi_callable(request)

        self.assertEqual(response, expected_response)
        for client_interceptor in self._client_interceptors:
            self.assertTrue(client_interceptor.intercepted)
        for server_interceptor in self._server_interceptors:
            self.assertTrue(server_interceptor.intercepted)