1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
# Generated by the Protocol Buffers compiler. DO NOT EDIT!
# source: grpclib/channelz/v1/channelz.proto
# plugin: grpclib.plugin.main
import abc
import typing
import grpclib.const
import grpclib.client
if typing.TYPE_CHECKING:
import grpclib.server
import google.protobuf.any_pb2
import google.protobuf.duration_pb2
import google.protobuf.timestamp_pb2
import google.protobuf.wrappers_pb2
import grpclib.channelz.v1.channelz_pb2
class ChannelzBase(abc.ABC):
@abc.abstractmethod
async def GetTopChannels(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetTopChannelsRequest, grpclib.channelz.v1.channelz_pb2.GetTopChannelsResponse]') -> None:
pass
@abc.abstractmethod
async def GetServers(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetServersRequest, grpclib.channelz.v1.channelz_pb2.GetServersResponse]') -> None:
pass
@abc.abstractmethod
async def GetServer(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetServerRequest, grpclib.channelz.v1.channelz_pb2.GetServerResponse]') -> None:
pass
@abc.abstractmethod
async def GetServerSockets(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetServerSocketsRequest, grpclib.channelz.v1.channelz_pb2.GetServerSocketsResponse]') -> None:
pass
@abc.abstractmethod
async def GetChannel(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetChannelRequest, grpclib.channelz.v1.channelz_pb2.GetChannelResponse]') -> None:
pass
@abc.abstractmethod
async def GetSubchannel(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetSubchannelRequest, grpclib.channelz.v1.channelz_pb2.GetSubchannelResponse]') -> None:
pass
@abc.abstractmethod
async def GetSocket(self, stream: 'grpclib.server.Stream[grpclib.channelz.v1.channelz_pb2.GetSocketRequest, grpclib.channelz.v1.channelz_pb2.GetSocketResponse]') -> None:
pass
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
return {
'/grpc.channelz.v1.Channelz/GetTopChannels': grpclib.const.Handler(
self.GetTopChannels,
grpclib.const.Cardinality.UNARY_UNARY,
grpclib.channelz.v1.channelz_pb2.GetTopChannelsRequest,
grpclib.channelz.v1.channelz_pb2.GetTopChannelsResponse,
),
'/grpc.channelz.v1.Channelz/GetServers': grpclib.const.Handler(
self.GetServers,
grpclib.const.Cardinality.UNARY_UNARY,
grpclib.channelz.v1.channelz_pb2.GetServersRequest,
grpclib.channelz.v1.channelz_pb2.GetServersResponse,
),
'/grpc.channelz.v1.Channelz/GetServer': grpclib.const.Handler(
self.GetServer,
grpclib.const.Cardinality.UNARY_UNARY,
grpclib.channelz.v1.channelz_pb2.GetServerRequest,
grpclib.channelz.v1.channelz_pb2.GetServerResponse,
),
'/grpc.channelz.v1.Channelz/GetServerSockets': grpclib.const.Handler(
self.GetServerSockets,
grpclib.const.Cardinality.UNARY_UNARY,
grpclib.channelz.v1.channelz_pb2.GetServerSocketsRequest,
grpclib.channelz.v1.channelz_pb2.GetServerSocketsResponse,
),
'/grpc.channelz.v1.Channelz/GetChannel': grpclib.const.Handler(
self.GetChannel,
grpclib.const.Cardinality.UNARY_UNARY,
grpclib.channelz.v1.channelz_pb2.GetChannelRequest,
grpclib.channelz.v1.channelz_pb2.GetChannelResponse,
),
'/grpc.channelz.v1.Channelz/GetSubchannel': grpclib.const.Handler(
self.GetSubchannel,
grpclib.const.Cardinality.UNARY_UNARY,
grpclib.channelz.v1.channelz_pb2.GetSubchannelRequest,
grpclib.channelz.v1.channelz_pb2.GetSubchannelResponse,
),
'/grpc.channelz.v1.Channelz/GetSocket': grpclib.const.Handler(
self.GetSocket,
grpclib.const.Cardinality.UNARY_UNARY,
grpclib.channelz.v1.channelz_pb2.GetSocketRequest,
grpclib.channelz.v1.channelz_pb2.GetSocketResponse,
),
}
class ChannelzStub:
def __init__(self, channel: grpclib.client.Channel) -> None:
self.GetTopChannels = grpclib.client.UnaryUnaryMethod(
channel,
'/grpc.channelz.v1.Channelz/GetTopChannels',
grpclib.channelz.v1.channelz_pb2.GetTopChannelsRequest,
grpclib.channelz.v1.channelz_pb2.GetTopChannelsResponse,
)
self.GetServers = grpclib.client.UnaryUnaryMethod(
channel,
'/grpc.channelz.v1.Channelz/GetServers',
grpclib.channelz.v1.channelz_pb2.GetServersRequest,
grpclib.channelz.v1.channelz_pb2.GetServersResponse,
)
self.GetServer = grpclib.client.UnaryUnaryMethod(
channel,
'/grpc.channelz.v1.Channelz/GetServer',
grpclib.channelz.v1.channelz_pb2.GetServerRequest,
grpclib.channelz.v1.channelz_pb2.GetServerResponse,
)
self.GetServerSockets = grpclib.client.UnaryUnaryMethod(
channel,
'/grpc.channelz.v1.Channelz/GetServerSockets',
grpclib.channelz.v1.channelz_pb2.GetServerSocketsRequest,
grpclib.channelz.v1.channelz_pb2.GetServerSocketsResponse,
)
self.GetChannel = grpclib.client.UnaryUnaryMethod(
channel,
'/grpc.channelz.v1.Channelz/GetChannel',
grpclib.channelz.v1.channelz_pb2.GetChannelRequest,
grpclib.channelz.v1.channelz_pb2.GetChannelResponse,
)
self.GetSubchannel = grpclib.client.UnaryUnaryMethod(
channel,
'/grpc.channelz.v1.Channelz/GetSubchannel',
grpclib.channelz.v1.channelz_pb2.GetSubchannelRequest,
grpclib.channelz.v1.channelz_pb2.GetSubchannelResponse,
)
self.GetSocket = grpclib.client.UnaryUnaryMethod(
channel,
'/grpc.channelz.v1.Channelz/GetSocket',
grpclib.channelz.v1.channelz_pb2.GetSocketRequest,
grpclib.channelz.v1.channelz_pb2.GetSocketResponse,
)
|