File: test_client_events.py

package info (click to toggle)
python-grpclib 0.4.9-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 676 kB
  • sloc: python: 6,864; makefile: 2
file content (102 lines) | stat: -rw-r--r-- 3,230 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from contextlib import nullcontext

import pytest

from multidict import MultiDict
from google.rpc.error_details_pb2 import ResourceInfo

from grpclib.const import Status
from grpclib.events import listen, SendRequest, SendMessage, RecvMessage
from grpclib.events import RecvInitialMetadata, RecvTrailingMetadata
from grpclib.testing import ChannelFor
from grpclib.exceptions import GRPCError

from dummy_pb2 import DummyRequest, DummyReply
from dummy_grpc import DummyServiceStub, DummyServiceBase


class DummyService(DummyServiceBase):

    def __init__(self, fail=False):
        self.fail = fail

    async def UnaryUnary(self, stream):
        await stream.recv_message()
        await stream.send_initial_metadata(metadata={'initial': 'true'})
        await stream.send_message(DummyReply(value='pong'))
        if self.fail:
            await stream.send_trailing_metadata(
                status=Status.NOT_FOUND,
                status_message="Everything is not OK",
                status_details=[ResourceInfo()],
                metadata={'trailing': 'true'},
            )
        else:
            await stream.send_trailing_metadata(metadata={'trailing': 'true'})

    async def UnaryStream(self, stream):
        raise GRPCError(Status.UNIMPLEMENTED)

    async def StreamUnary(self, stream):
        raise GRPCError(Status.UNIMPLEMENTED)

    async def StreamStream(self, stream):
        raise GRPCError(Status.UNIMPLEMENTED)


async def _test(event_type, *, fail=False):
    service = DummyService(fail)
    events = []

    async def callback(event_):
        events.append(event_)

    async with ChannelFor([service]) as channel:
        listen(channel, event_type, callback)
        stub = DummyServiceStub(channel)

        ctx = pytest.raises(GRPCError) if fail else nullcontext()
        with ctx:
            reply = await stub.UnaryUnary(DummyRequest(value='ping'),
                                          timeout=1,
                                          metadata={'request': 'true'})
            assert reply == DummyReply(value='pong')

    event, = events
    return event


@pytest.mark.asyncio
async def test_send_request():
    event = await _test(SendRequest)
    assert event.metadata == MultiDict({'request': 'true'})
    assert event.method_name == '/dummy.DummyService/UnaryUnary'
    assert event.deadline.time_remaining() > 0
    assert event.content_type == 'application/grpc'


@pytest.mark.asyncio
async def test_send_message():
    event = await _test(SendMessage)
    assert event.message == DummyRequest(value='ping')


@pytest.mark.asyncio
async def test_recv_message():
    event = await _test(RecvMessage)
    assert event.message == DummyReply(value='pong')


@pytest.mark.asyncio
async def test_recv_initial_metadata():
    event = await _test(RecvInitialMetadata)
    assert event.metadata == MultiDict({'initial': 'true'})


@pytest.mark.asyncio
async def test_recv_trailing_metadata():
    event = await _test(RecvTrailingMetadata, fail=True)
    assert event.metadata == MultiDict({'trailing': 'true'})
    assert event.status is Status.NOT_FOUND
    assert event.status_message == "Everything is not OK"
    assert isinstance(event.status_details[0], ResourceInfo)