1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
// Package test contains test utilities
package test
import (
"context"
"fmt"
"log"
"net"
"net/http"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/envoyproxy/go-control-plane/pkg/test/v3"
gcplogger "github.com/envoyproxy/go-control-plane/pkg/log"
)
const (
grpcKeepaliveTime = 30 * time.Second
grpcKeepaliveTimeout = 5 * time.Second
grpcKeepaliveMinTime = 30 * time.Second
grpcMaxConcurrentStreams = 1000000
)
// HTTPGateway is a custom implementation of [gRPC gateway](https://github.com/grpc-ecosystem/grpc-gateway)
// specialized to Envoy xDS API.
type HTTPGateway struct {
// Log is an optional log for errors in response write
Log gcplogger.Logger
Gateway server.HTTPGateway
}
// RunAccessLogServer starts an accesslog server.
func RunAccessLogServer(ctx context.Context, als *test.AccessLogService, alsPort uint) {
grpcServer := grpc.NewServer()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", alsPort))
if err != nil {
log.Fatal(err)
}
test.RegisterAccessLogServer(grpcServer, als)
log.Printf("access log server listening on %d\n", alsPort)
go func() {
if err = grpcServer.Serve(lis); err != nil {
log.Println(err)
}
}()
<-ctx.Done()
grpcServer.GracefulStop()
}
// RunManagementServer starts an xDS server at the given port.
func RunManagementServer(ctx context.Context, srv server.Server, port uint) {
// gRPC golang library sets a very small upper bound for the number gRPC/h2
// streams over a single TCP connection. If a proxy multiplexes requests over
// a single connection to the management server, then it might lead to
// availability problems. Keepalive timeouts based on connection_keepalive parameter https://www.envoyproxy.io/docs/envoy/latest/configuration/overview/examples#dynamic
var grpcOptions []grpc.ServerOption
grpcOptions = append(grpcOptions,
grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams),
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: grpcKeepaliveTime,
Timeout: grpcKeepaliveTimeout,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: grpcKeepaliveMinTime,
PermitWithoutStream: true,
}),
)
grpcServer := grpc.NewServer(grpcOptions...)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatal(err)
}
test.RegisterServer(grpcServer, srv)
log.Printf("management server listening on %d\n", port)
go func() {
if err = grpcServer.Serve(lis); err != nil {
log.Println(err)
}
}()
<-ctx.Done()
grpcServer.GracefulStop()
}
// RunManagementGateway starts an HTTP gateway to an xDS server.
func RunManagementGateway(ctx context.Context, srv server.Server, port uint) {
log.Printf("gateway listening HTTP/1.1 on %d\n", port)
// Ignore: G114: Use of net/http serve function that has no support for setting timeouts
// nolint:gosec
server := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: &HTTPGateway{
Gateway: server.HTTPGateway{Server: srv},
},
}
go func() {
if err := server.ListenAndServe(); err != nil {
log.Printf("failed to start listening: %s", err)
}
}()
<-ctx.Done()
// Cleanup our gateway if we receive a shutdown
if err := server.Shutdown(ctx); err != nil {
log.Printf("failed to shut down: %s", err)
}
}
func (h *HTTPGateway) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
bytes, code, err := h.Gateway.ServeHTTP(req)
if err != nil {
http.Error(resp, err.Error(), code)
return
}
if bytes == nil {
resp.WriteHeader(http.StatusNotModified)
return
}
if _, err = resp.Write(bytes); err != nil && h.Log != nil {
h.Log.Errorf("gateway error: %v", err)
}
}
|