File: channelz_grpc.py

package info (click to toggle)
python-grpclib 0.4.8-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 484 kB
  • sloc: python: 3,370; makefile: 2
file content (140 lines) | stat: -rw-r--r-- 6,401 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Generated by the Protocol Buffers compiler. DO NOT EDIT!
# source: grpclib/channelz/v1/channelz.proto
# plugin: grpclib.plugin.main
import abc
import typing

import grpclib.const
import grpclib.client
if typing.TYPE_CHECKING:
    import grpclib.server

import google.protobuf.any_pb2
import google.protobuf.duration_pb2
import google.protobuf.timestamp_pb2
import google.protobuf.wrappers_pb2
import grpclib.channelz.v1.channelz_pb2


class ChannelzBase(abc.ABC):

    @abc.abstractmethod
    async def GetTopChannels(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetTopChannelsRequest, grpclib.channelz.v1.channelz_pb2.GetTopChannelsResponse]') -> None:
        pass

    @abc.abstractmethod
    async def GetServers(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetServersRequest, grpclib.channelz.v1.channelz_pb2.GetServersResponse]') -> None:
        pass

    @abc.abstractmethod
    async def GetServer(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetServerRequest, grpclib.channelz.v1.channelz_pb2.GetServerResponse]') -> None:
        pass

    @abc.abstractmethod
    async def GetServerSockets(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetServerSocketsRequest, grpclib.channelz.v1.channelz_pb2.GetServerSocketsResponse]') -> None:
        pass

    @abc.abstractmethod
    async def GetChannel(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetChannelRequest, grpclib.channelz.v1.channelz_pb2.GetChannelResponse]') -> None:
        pass

    @abc.abstractmethod
    async def GetSubchannel(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetSubchannelRequest, grpclib.channelz.v1.channelz_pb2.GetSubchannelResponse]') -> None:
        pass

    @abc.abstractmethod
    async def GetSocket(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetSocketRequest, grpclib.channelz.v1.channelz_pb2.GetSocketResponse]') -> None:
        pass

    def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
        return {
            '/grpc.channelz.v1.Channelz/GetTopChannels': grpclib.const.Handler(
                self.GetTopChannels,
                grpclib.const.Cardinality.UNARY_UNARY,
                grpclib.channelz.v1.channelz_pb2.GetTopChannelsRequest,
                grpclib.channelz.v1.channelz_pb2.GetTopChannelsResponse,
            ),
            '/grpc.channelz.v1.Channelz/GetServers': grpclib.const.Handler(
                self.GetServers,
                grpclib.const.Cardinality.UNARY_UNARY,
                grpclib.channelz.v1.channelz_pb2.GetServersRequest,
                grpclib.channelz.v1.channelz_pb2.GetServersResponse,
            ),
            '/grpc.channelz.v1.Channelz/GetServer': grpclib.const.Handler(
                self.GetServer,
                grpclib.const.Cardinality.UNARY_UNARY,
                grpclib.channelz.v1.channelz_pb2.GetServerRequest,
                grpclib.channelz.v1.channelz_pb2.GetServerResponse,
            ),
            '/grpc.channelz.v1.Channelz/GetServerSockets': grpclib.const.Handler(
                self.GetServerSockets,
                grpclib.const.Cardinality.UNARY_UNARY,
                grpclib.channelz.v1.channelz_pb2.GetServerSocketsRequest,
                grpclib.channelz.v1.channelz_pb2.GetServerSocketsResponse,
            ),
            '/grpc.channelz.v1.Channelz/GetChannel': grpclib.const.Handler(
                self.GetChannel,
                grpclib.const.Cardinality.UNARY_UNARY,
                grpclib.channelz.v1.channelz_pb2.GetChannelRequest,
                grpclib.channelz.v1.channelz_pb2.GetChannelResponse,
            ),
            '/grpc.channelz.v1.Channelz/GetSubchannel': grpclib.const.Handler(
                self.GetSubchannel,
                grpclib.const.Cardinality.UNARY_UNARY,
                grpclib.channelz.v1.channelz_pb2.GetSubchannelRequest,
                grpclib.channelz.v1.channelz_pb2.GetSubchannelResponse,
            ),
            '/grpc.channelz.v1.Channelz/GetSocket': grpclib.const.Handler(
                self.GetSocket,
                grpclib.const.Cardinality.UNARY_UNARY,
                grpclib.channelz.v1.channelz_pb2.GetSocketRequest,
                grpclib.channelz.v1.channelz_pb2.GetSocketResponse,
            ),
        }


class ChannelzStub:

    def __init__(self, channel: grpclib.client.Channel) -> None:
        self.GetTopChannels = grpclib.client.UnaryUnaryMethod(
            channel,
            '/grpc.channelz.v1.Channelz/GetTopChannels',
            grpclib.channelz.v1.channelz_pb2.GetTopChannelsRequest,
            grpclib.channelz.v1.channelz_pb2.GetTopChannelsResponse,
        )
        self.GetServers = grpclib.client.UnaryUnaryMethod(
            channel,
            '/grpc.channelz.v1.Channelz/GetServers',
            grpclib.channelz.v1.channelz_pb2.GetServersRequest,
            grpclib.channelz.v1.channelz_pb2.GetServersResponse,
        )
        self.GetServer = grpclib.client.UnaryUnaryMethod(
            channel,
            '/grpc.channelz.v1.Channelz/GetServer',
            grpclib.channelz.v1.channelz_pb2.GetServerRequest,
            grpclib.channelz.v1.channelz_pb2.GetServerResponse,
        )
        self.GetServerSockets = grpclib.client.UnaryUnaryMethod(
            channel,
            '/grpc.channelz.v1.Channelz/GetServerSockets',
            grpclib.channelz.v1.channelz_pb2.GetServerSocketsRequest,
            grpclib.channelz.v1.channelz_pb2.GetServerSocketsResponse,
        )
        self.GetChannel = grpclib.client.UnaryUnaryMethod(
            channel,
            '/grpc.channelz.v1.Channelz/GetChannel',
            grpclib.channelz.v1.channelz_pb2.GetChannelRequest,
            grpclib.channelz.v1.channelz_pb2.GetChannelResponse,
        )
        self.GetSubchannel = grpclib.client.UnaryUnaryMethod(
            channel,
            '/grpc.channelz.v1.Channelz/GetSubchannel',
            grpclib.channelz.v1.channelz_pb2.GetSubchannelRequest,
            grpclib.channelz.v1.channelz_pb2.GetSubchannelResponse,
        )
        self.GetSocket = grpclib.client.UnaryUnaryMethod(
            channel,
            '/grpc.channelz.v1.Channelz/GetSocket',
            grpclib.channelz.v1.channelz_pb2.GetSocketRequest,
            grpclib.channelz.v1.channelz_pb2.GetSocketResponse,
        )