1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
// FIXME(thaJeztah): remove once we are a module; the go:build directive prevents go from downgrading language version to go1.16:
//go:build go1.22
package grpc // import "github.com/docker/docker/api/server/router/grpc"
import (
"context"
"fmt"
"os"
"strings"
"github.com/containerd/containerd/defaults"
"github.com/containerd/log"
"github.com/docker/docker/api/server/router"
"github.com/docker/docker/internal/otelutil"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/stack"
"github.com/moby/buildkit/util/tracing"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/net/http2"
"google.golang.org/grpc"
)
type grpcRouter struct {
routes []router.Route
grpcServer *grpc.Server
h2Server *http2.Server
}
// NewRouter initializes a new grpc http router
func NewRouter(backends ...Backend) router.Router {
tp, _ := otelutil.NewTracerProvider(context.Background(), false)
opts := []grpc.ServerOption{
grpc.StatsHandler(tracing.ServerStatsHandler(otelgrpc.WithTracerProvider(tp))),
grpc.ChainUnaryInterceptor(unaryInterceptor, grpcerrors.UnaryServerInterceptor),
grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor),
grpc.MaxRecvMsgSize(defaults.DefaultMaxRecvMsgSize),
grpc.MaxSendMsgSize(defaults.DefaultMaxSendMsgSize),
}
r := &grpcRouter{
h2Server: &http2.Server{},
grpcServer: grpc.NewServer(opts...),
}
for _, b := range backends {
b.RegisterGRPC(r.grpcServer)
}
r.initRoutes()
return r
}
// Routes returns the available routers to the session controller
func (gr *grpcRouter) Routes() []router.Route {
return gr.routes
}
func (gr *grpcRouter) initRoutes() {
gr.routes = []router.Route{
router.NewPostRoute("/grpc", gr.serveGRPC),
}
}
func unaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
// This method is used by the clients to send their traces to buildkit so they can be included
// in the daemon trace and stored in the build history record. This method can not be traced because
// it would cause an infinite loop.
if strings.HasSuffix(info.FullMethod, "opentelemetry.proto.collector.trace.v1.TraceService/Export") {
return handler(ctx, req)
}
resp, err = handler(ctx, req)
if err != nil {
log.G(ctx).WithError(err).Error(info.FullMethod)
if log.GetLevel() >= log.DebugLevel {
fmt.Fprintf(os.Stderr, "%+v", stack.Formatter(grpcerrors.FromGRPC(err)))
}
}
return resp, err
}
|