1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
package grpc
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/transport"
"github.com/go-kit/log"
)
// Handler which should be called from the gRPC binding of the service
// implementation. The incoming request parameter, and returned response
// parameter, are both gRPC types, not user-domain.
type Handler interface {
ServeGRPC(ctx context.Context, request interface{}) (context.Context, interface{}, error)
}
// Server wraps an endpoint and implements grpc.Handler.
type Server struct {
e endpoint.Endpoint
dec DecodeRequestFunc
enc EncodeResponseFunc
before []ServerRequestFunc
after []ServerResponseFunc
finalizer []ServerFinalizerFunc
errorHandler transport.ErrorHandler
}
// NewServer constructs a new server, which implements wraps the provided
// endpoint and implements the Handler interface. Consumers should write
// bindings that adapt the concrete gRPC methods from their compiled protobuf
// definitions to individual handlers. Request and response objects are from the
// caller business domain, not gRPC request and reply types.
func NewServer(
e endpoint.Endpoint,
dec DecodeRequestFunc,
enc EncodeResponseFunc,
options ...ServerOption,
) *Server {
s := &Server{
e: e,
dec: dec,
enc: enc,
errorHandler: transport.NewLogErrorHandler(log.NewNopLogger()),
}
for _, option := range options {
option(s)
}
return s
}
// ServerOption sets an optional parameter for servers.
type ServerOption func(*Server)
// ServerBefore functions are executed on the gRPC request object before the
// request is decoded.
func ServerBefore(before ...ServerRequestFunc) ServerOption {
return func(s *Server) { s.before = append(s.before, before...) }
}
// ServerAfter functions are executed on the gRPC response writer after the
// endpoint is invoked, but before anything is written to the client.
func ServerAfter(after ...ServerResponseFunc) ServerOption {
return func(s *Server) { s.after = append(s.after, after...) }
}
// ServerErrorLogger is used to log non-terminal errors. By default, no errors
// are logged.
// Deprecated: Use ServerErrorHandler instead.
func ServerErrorLogger(logger log.Logger) ServerOption {
return func(s *Server) { s.errorHandler = transport.NewLogErrorHandler(logger) }
}
// ServerErrorHandler is used to handle non-terminal errors. By default, non-terminal errors
// are ignored.
func ServerErrorHandler(errorHandler transport.ErrorHandler) ServerOption {
return func(s *Server) { s.errorHandler = errorHandler }
}
// ServerFinalizer is executed at the end of every gRPC request.
// By default, no finalizer is registered.
func ServerFinalizer(f ...ServerFinalizerFunc) ServerOption {
return func(s *Server) { s.finalizer = append(s.finalizer, f...) }
}
// ServeGRPC implements the Handler interface.
func (s Server) ServeGRPC(ctx context.Context, req interface{}) (retctx context.Context, resp interface{}, err error) {
// Retrieve gRPC metadata.
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.MD{}
}
if len(s.finalizer) > 0 {
defer func() {
for _, f := range s.finalizer {
f(ctx, err)
}
}()
}
for _, f := range s.before {
ctx = f(ctx, md)
}
var (
request interface{}
response interface{}
grpcResp interface{}
)
request, err = s.dec(ctx, req)
if err != nil {
s.errorHandler.Handle(ctx, err)
return ctx, nil, err
}
response, err = s.e(ctx, request)
if err != nil {
s.errorHandler.Handle(ctx, err)
return ctx, nil, err
}
var mdHeader, mdTrailer metadata.MD
for _, f := range s.after {
ctx = f(ctx, &mdHeader, &mdTrailer)
}
grpcResp, err = s.enc(ctx, response)
if err != nil {
s.errorHandler.Handle(ctx, err)
return ctx, nil, err
}
if len(mdHeader) > 0 {
if err = grpc.SendHeader(ctx, mdHeader); err != nil {
s.errorHandler.Handle(ctx, err)
return ctx, nil, err
}
}
if len(mdTrailer) > 0 {
if err = grpc.SetTrailer(ctx, mdTrailer); err != nil {
s.errorHandler.Handle(ctx, err)
return ctx, nil, err
}
}
return ctx, grpcResp, nil
}
// ServerFinalizerFunc can be used to perform work at the end of an gRPC
// request, after the response has been written to the client.
type ServerFinalizerFunc func(ctx context.Context, err error)
// Interceptor is a grpc UnaryInterceptor that injects the method name into
// context so it can be consumed by Go kit gRPC middlewares. The Interceptor
// typically is added at creation time of the grpc-go server.
// Like this: `grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor))`
func Interceptor(
ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (resp interface{}, err error) {
ctx = context.WithValue(ctx, ContextKeyRequestMethod, info.FullMethod)
return handler(ctx, req)
}
|