File: test_memory.py

package info (click to toggle)
python-grpclib 0.4.9-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 676 kB
  • sloc: python: 6,864; makefile: 2
file content (107 lines) | stat: -rw-r--r-- 2,932 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import gc
import asyncio.tasks

import pytest

from grpclib.const import Status
from grpclib.testing import ChannelFor
from grpclib.exceptions import GRPCError

from conn import ClientServer
from dummy_pb2 import DummyRequest, DummyReply
from dummy_grpc import DummyServiceBase, DummyServiceStub


class DummyService(DummyServiceBase):

    async def UnaryUnary(self, stream):
        request = await stream.recv_message()
        assert request == DummyRequest(value='ping')
        await stream.send_message(DummyReply(value='pong'))

    async def UnaryStream(self, stream):
        raise GRPCError(Status.UNIMPLEMENTED)

    async def StreamUnary(self, stream):
        raise GRPCError(Status.UNIMPLEMENTED)

    async def StreamStream(self, stream):
        raise GRPCError(Status.UNIMPLEMENTED)


def collect():
    objects = gc.get_objects()
    return {id(obj): obj for obj in objects}


def _check(type_name):
    """Utility function to debug references"""
    import objgraph

    objects = objgraph.by_type(type_name)
    if objects:
        obj = objects[0]
        objgraph.show_backrefs(obj, max_depth=3, filename='graph.png')


def test_connection():
    loop = asyncio.new_event_loop()

    async def example():
        async with ChannelFor([DummyService()]) as channel:
            stub = DummyServiceStub(channel)
            await stub.UnaryUnary(DummyRequest(value='ping'))

    # warm up
    loop.run_until_complete(example())

    gc.collect()
    gc.disable()
    try:
        pre = set(collect())
        loop.run_until_complete(example())
        loop.stop()
        loop.close()
        post = collect()

        diff = set(post).difference(pre)
        diff.discard(id(pre))
        diff.discard(id(asyncio.tasks._current_tasks))
        if diff:
            for i in diff:
                try:
                    print(post[i])
                except Exception:
                    print('...')
            raise AssertionError('Memory leak detected')
    finally:
        gc.enable()


@pytest.mark.asyncio
async def test_stream():
    cs = ClientServer(DummyService, DummyServiceStub)
    async with cs as (_, stub):
        await stub.UnaryUnary(DummyRequest(value='ping'))
        handler = next(iter(cs.server._handlers))
        handler.__gc_collect__()
        gc.collect()
        gc.disable()
        try:
            pre = set(collect())
            await stub.UnaryUnary(DummyRequest(value='ping'))
            handler.__gc_collect__()
            post = collect()

            diff = set(post).difference(pre)
            diff.discard(id(pre))
            for i in diff:
                try:
                    print(repr(post[i])[:120])
                except Exception:
                    print('...')
                else:
                    if 'grpclib.' in repr(post[i]):
                        raise AssertionError('Memory leak detected')
        finally:
            gc.enable()