1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
package client
import (
"context"
"io"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/listenmux"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// SidechannelRegistry associates sidechannel callbacks with outbound
// gRPC calls.
type SidechannelRegistry struct {
registry *sidechannel.Registry
logger *logrus.Entry
}
// NewSidechannelRegistry returns a new registry.
func NewSidechannelRegistry(logger *logrus.Entry) *SidechannelRegistry {
return &SidechannelRegistry{
registry: sidechannel.NewRegistry(),
logger: logger,
}
}
// Register registers a callback. It adds metadata to ctx and returns the
// new context. The caller must use the new context for the gRPC call.
// Caller must Close() the returned SidechannelWaiter to prevent resource
// leaks.
func (sr *SidechannelRegistry) Register(
ctx context.Context,
callback func(SidechannelConn) error,
) (context.Context, *SidechannelWaiter) {
ctx, waiter := sidechannel.RegisterSidechannel(
ctx,
sr.registry,
func(cc *sidechannel.ClientConn) error { return callback(cc) },
)
return ctx, &SidechannelWaiter{waiter: waiter}
}
// SidechannelWaiter represents a pending sidechannel and its callback.
type SidechannelWaiter struct{ waiter *sidechannel.Waiter }
// Close de-registers the sidechannel callback. If the callback is still
// running, Close blocks until it is done and returns the error return
// value of the callback. If the callback has not been called yet, Close
// returns an error immediately.
func (w *SidechannelWaiter) Close() error { return w.waiter.Close() }
// SidechannelConn allows a client to read and write bytes with less
// overhead than doing so via gRPC messages.
type SidechannelConn interface {
io.ReadWriter
// CloseWrite tells the server we won't write any more data. We can still
// read data from the server after CloseWrite(). A typical use case is in
// an RPC where the byte stream has a request/response pattern: the
// client then uses CloseWrite() to signal the end of the request body.
// When the client calls CloseWrite(), the server receives EOF.
CloseWrite() error
}
// TestSidechannelServer allows downstream consumers of this package to
// create mock sidechannel gRPC servers.
func TestSidechannelServer(
logger *logrus.Entry,
creds credentials.TransportCredentials,
handler func(interface{}, grpc.ServerStream, io.ReadWriteCloser) error,
) []grpc.ServerOption {
return []grpc.ServerOption{
SidechannelServer(logger, creds),
grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error {
conn, err := OpenServerSidechannel(stream.Context())
if err != nil {
return err
}
defer conn.Close()
return handler(srv, stream, conn)
}),
}
}
// SidechannelServer adds sidechannel support to a gRPC server
func SidechannelServer(logger *logrus.Entry, creds credentials.TransportCredentials) grpc.ServerOption {
lm := listenmux.New(creds)
lm.Register(backchannel.NewServerHandshaker(logger, backchannel.NewRegistry(), nil))
return grpc.Creds(lm)
}
// OpenServerSidechannel opens a sidechannel on the server side. This
// only works if the server was created using SidechannelServer().
func OpenServerSidechannel(ctx context.Context) (io.ReadWriteCloser, error) {
return sidechannel.OpenSidechannel(ctx)
}
|