1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
|
package agent
import (
"time"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modagent"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modshared"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/reverse_tunnel"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/reverse_tunnel/info"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/reverse_tunnel/rpc"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/grpctool"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/retry"
"google.golang.org/grpc"
)
const (
minIdleConnections = 2
maxConnections = 500
maxIdleTime = time.Minute
// scaleUpStep defines how many new connections are started when there is not enough idle connections.
scaleUpStep = 10
connectionInitBackoff = 1 * time.Second
connectionMaxBackoff = 20 * time.Second
connectionResetDuration = 25 * time.Second
connectionBackoffFactor = 1.6
connectionJitter = 0.2
)
type Factory struct {
InternalServerConn grpc.ClientConnInterface
}
func (f *Factory) New(config *modagent.Config) (modagent.Module, error) {
sv, err := grpctool.NewStreamVisitor(&rpc.ConnectResponse{})
if err != nil {
return nil, err
}
client := rpc.NewReverseTunnelClient(config.KasConn)
pollConfig := retry.NewPollConfigFactory(0, retry.NewExponentialBackoffFactory(
connectionInitBackoff,
connectionMaxBackoff,
connectionResetDuration,
connectionBackoffFactor,
connectionJitter,
))
return &module{
server: config.Server,
minIdleConnections: minIdleConnections,
maxConnections: maxConnections,
scaleUpStep: scaleUpStep,
maxIdleTime: maxIdleTime,
connectionFactory: func(descriptor *info.AgentDescriptor, onActive, onIdle func(c connectionInterface)) connectionInterface {
return &connection{
log: config.Log,
descriptor: descriptor,
client: client,
internalServerConn: f.InternalServerConn,
streamVisitor: sv,
pollConfig: pollConfig,
onActive: onActive,
onIdle: onIdle,
}
},
}, nil
}
func (f *Factory) Name() string {
return reverse_tunnel.ModuleName
}
func (f *Factory) StartStopPhase() modshared.ModuleStartStopPhase {
return modshared.ModuleStartAfterServers
}
|