File: test_stats.py

package info (click to toggle)
python-grpclib 0.4.9-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 676 kB
  • sloc: python: 6,864; makefile: 2
file content (210 lines) | stat: -rw-r--r-- 7,803 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import time
import asyncio
from unittest.mock import patch

import pytest

from grpclib.const import Status
from grpclib.client import _ChannelState, Channel
from grpclib.testing import ChannelFor
from grpclib.exceptions import GRPCError

from dummy_pb2 import DummyRequest, DummyReply
from dummy_grpc import DummyServiceBase, DummyServiceStub


def _is_recent(value):
    assert time.monotonic() - value < 1


class DummyService(DummyServiceBase):

    async def UnaryUnary(self, stream):
        raise GRPCError(Status.UNIMPLEMENTED)

    async def UnaryStream(self, stream):
        raise GRPCError(Status.UNIMPLEMENTED)

    async def StreamUnary(self, stream):
        raise GRPCError(Status.UNIMPLEMENTED)

    async def StreamStream(self, stream):
        raise GRPCError(Status.UNIMPLEMENTED)


class WorkingDummyService(DummyService):
    async def UnaryUnary(self, stream):
        await stream.recv_message()
        await stream.send_message(DummyReply(value='test'))


class FailingDummyService(DummyService):
    async def UnaryUnary(self, stream):
        raise Exception('unexpected')


@pytest.mark.asyncio
async def test_channel_calls_succeeded():
    async with ChannelFor([WorkingDummyService()]) as channel:
        stub = DummyServiceStub(channel)

        assert channel._calls_started == 0
        assert channel._calls_succeeded == 0
        assert channel._calls_failed == 0
        assert channel._last_call_started is None

        reply = await stub.UnaryUnary(DummyRequest(value='whatever'))

        assert channel._calls_started == 1
        assert channel._calls_succeeded == 1
        assert channel._calls_failed == 0
        _is_recent(channel._last_call_started)

    assert reply == DummyReply(value='test')


@pytest.mark.asyncio
async def test_channel_calls_failed():
    async with ChannelFor([FailingDummyService()]) as channel:
        stub = DummyServiceStub(channel)

        assert channel._calls_started == 0
        assert channel._calls_succeeded == 0
        assert channel._calls_failed == 0
        assert channel._last_call_started is None

        with pytest.raises(GRPCError, match='Internal Server Error'):
            await stub.UnaryUnary(DummyRequest(value='whatever'))

        assert channel._calls_started == 1
        assert channel._calls_succeeded == 0
        assert channel._calls_failed == 1
        _is_recent(channel._last_call_started)


@pytest.mark.asyncio
async def test_channel_ready(loop):
    async with ChannelFor([WorkingDummyService()]) as _channel:
        channel = Channel()

        async def _create_connection():
            await asyncio.sleep(0.01)
            return _channel._protocol

        with patch.object(channel, '_create_connection', _create_connection):
            assert channel._state is _ChannelState.IDLE
            connect_task = loop.create_task(channel.__connect__())
            await asyncio.sleep(0.001)
            assert channel._state is _ChannelState.CONNECTING
            await connect_task
            assert channel._state is _ChannelState.READY

        channel.close()
        assert channel._state is _ChannelState.IDLE


@pytest.mark.asyncio
async def test_channel_transient_failure(loop):
    channel = Channel()

    async def _create_connection():
        await asyncio.sleep(0.01)
        raise asyncio.TimeoutError('try again later')

    with patch.object(channel, '_create_connection', _create_connection):
        assert channel._state is _ChannelState.IDLE
        connect_task = loop.create_task(channel.__connect__())
        await asyncio.sleep(0.001)
        assert channel._state is _ChannelState.CONNECTING
        with pytest.raises(asyncio.TimeoutError, match='try again later'):
            await connect_task
        assert channel._state is _ChannelState.TRANSIENT_FAILURE

    channel.close()
    assert channel._state is _ChannelState.IDLE


@pytest.mark.asyncio
async def test_client_stream():
    async with ChannelFor([WorkingDummyService()]) as channel:
        proto = await channel.__connect__()
        stub = DummyServiceStub(channel)

        async with stub.UnaryUnary.open() as stream:
            assert proto.connection.streams_started == 0
            assert proto.connection.streams_succeeded == 0
            assert proto.connection.streams_failed == 0
            assert proto.connection.last_stream_created is None
            await stream.send_request()
            _is_recent(stream._stream.created)

            assert proto.connection.streams_started == 1
            assert proto.connection.streams_succeeded == 0
            assert proto.connection.streams_failed == 0
            _is_recent(proto.connection.last_stream_created)

            assert stream._messages_sent == 0
            assert stream._stream.data_sent == 0
            assert proto.connection.messages_sent == 0
            assert proto.connection.data_sent == 0
            assert proto.connection.last_message_sent is None
            await stream.send_message(DummyRequest(value='whatever'), end=True)
            assert stream._messages_sent == 1
            assert stream._stream.data_sent > 0
            assert proto.connection.messages_sent == 1
            assert proto.connection.data_sent > 0
            _is_recent(proto.connection.last_message_sent)
            _is_recent(proto.connection.last_data_sent)

            assert stream._messages_received == 0
            assert stream._stream.data_received == 0
            assert proto.connection.messages_received == 0
            assert proto.connection.data_received == 0
            assert proto.connection.last_message_received is None
            reply = await stream.recv_message()
            assert stream._messages_received == 1
            assert stream._stream.data_received > 0
            assert proto.connection.messages_received == 1
            assert proto.connection.data_received > 0
            _is_recent(proto.connection.last_message_received)
            _is_recent(proto.connection.last_data_received)

        assert proto.connection.streams_started == 1
        assert proto.connection.streams_succeeded == 1
        assert proto.connection.streams_failed == 0
    assert reply == DummyReply(value='test')


@pytest.mark.asyncio
async def test_server_stream():
    cf = ChannelFor([WorkingDummyService()])
    async with cf as channel:
        stub = DummyServiceStub(channel)
        handler, = cf._server._handlers

        async with stub.UnaryUnary.open() as stream:
            await stream.send_request()
            while not handler._tasks:
                await asyncio.sleep(0.001)

            server_stream, = handler._tasks.keys()
            _is_recent(server_stream.created)
            assert server_stream.data_sent == 0
            assert server_stream.data_received == 0
            assert server_stream.connection.messages_sent == 0
            assert server_stream.connection.messages_received == 0
            assert server_stream.connection.last_message_sent is None
            assert server_stream.connection.last_message_received is None
            await stream.send_message(DummyRequest(value='whatever'), end=True)
            await asyncio.sleep(0.01)
            assert server_stream.data_sent > 0
            assert server_stream.data_received > 0
            assert server_stream.connection.messages_sent == 1
            assert server_stream.connection.messages_received == 1
            _is_recent(server_stream.connection.last_data_sent)
            _is_recent(server_stream.connection.last_data_received)
            _is_recent(server_stream.connection.last_message_sent)
            _is_recent(server_stream.connection.last_message_received)

            reply = await stream.recv_message()
    assert reply == DummyReply(value='test')