1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
|
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"fmt"
"net"
"net/url"
"os"
"strconv"
"time"
"github.com/google/uuid"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"sigs.k8s.io/apiserver-network-proxy/pkg/agent"
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
"sigs.k8s.io/apiserver-network-proxy/proto/header"
)
type GrpcProxyAgentOptions struct {
// Configuration for authenticating with the proxy-server
AgentCert string
AgentKey string
CaCert string
// Configuration for connecting to the proxy-server
ProxyServerHost string
ProxyServerPort int
AlpnProtos []string
// Bind address for the health connections.
HealthServerHost string
// Port we listen for health connections on.
HealthServerPort int
// Bind address for the admin connections.
AdminBindAddress string
// Port we listen for admin connections on.
AdminServerPort int
// Enables pprof at host:adminPort/debug/pprof.
EnableProfiling bool
// If EnableProfiling is true, this enables the lock contention
// profiling at host:adminPort/debug/pprof/block.
EnableContentionProfiling bool
AgentID string
AgentIdentifiers string
SyncInterval time.Duration
ProbeInterval time.Duration
SyncIntervalCap time.Duration
// After a duration of this time if the agent doesn't see any activity it
// pings the server to see if the transport is still alive.
KeepaliveTime time.Duration
// file contains service account authorization token for enabling proxy-server token based authorization
ServiceAccountTokenPath string
// This warns if we attempt to push onto a "full" transfer channel.
// However checking that the transfer channel is full is not safe.
// It violates our race condition checking. Adding locks around a potentially
// blocking call has its own problems, so it cannot easily be made race condition safe.
// The check is an "unlocked" read but is still use at your own peril.
WarnOnChannelLimit bool
SyncForever bool
XfrChannelSize int
// Enables updating the server count by counting the number of valid leases
// matching the selector.
CountServerLeases bool
// Namespace where lease objects are managed.
LeaseNamespace string
// Labels on which lease objects are managed.
LeaseLabel string
// ServerCountSource describes how server counts should be combined.
ServerCountSource string
// Path to kubeconfig (used by kubernetes client for lease listing)
KubeconfigPath string
// Content type of requests sent to apiserver.
APIContentType string
}
func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig {
return &agent.ClientSetConfig{
Address: net.JoinHostPort(o.ProxyServerHost, strconv.Itoa(o.ProxyServerPort)),
AgentID: o.AgentID,
AgentIdentifiers: o.AgentIdentifiers,
SyncInterval: o.SyncInterval,
ProbeInterval: o.ProbeInterval,
SyncIntervalCap: o.SyncIntervalCap,
DialOptions: dialOptions,
ServiceAccountTokenPath: o.ServiceAccountTokenPath,
WarnOnChannelLimit: o.WarnOnChannelLimit,
SyncForever: o.SyncForever,
XfrChannelSize: o.XfrChannelSize,
ServerCountSource: o.ServerCountSource,
}
}
func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags := pflag.NewFlagSet("proxy-agent", pflag.ContinueOnError)
flags.StringVar(&o.AgentCert, "agent-cert", o.AgentCert, "If non-empty secure communication with this cert.")
flags.StringVar(&o.AgentKey, "agent-key", o.AgentKey, "If non-empty secure communication with this key.")
flags.StringVar(&o.CaCert, "ca-cert", o.CaCert, "If non-empty the CAs we use to validate clients.")
flags.StringVar(&o.ProxyServerHost, "proxy-server-host", o.ProxyServerHost, "The hostname to use to connect to the proxy-server.")
flags.IntVar(&o.ProxyServerPort, "proxy-server-port", o.ProxyServerPort, "The port the proxy server is listening on.")
flags.StringSliceVar(&o.AlpnProtos, "alpn-proto", o.AlpnProtos, "Additional ALPN protocols to be presented when connecting to the server. Useful to distinguish between network proxy and apiserver connections that share the same destination address.")
flags.StringVar(&o.HealthServerHost, "health-server-host", o.HealthServerHost, "The host address to listen on, without port.")
flags.IntVar(&o.HealthServerPort, "health-server-port", o.HealthServerPort, "The port the health server is listening on.")
flags.IntVar(&o.AdminServerPort, "admin-server-port", o.AdminServerPort, "The port the admin server is listening on.")
flags.StringVar(&o.AdminBindAddress, "admin-bind-address", o.AdminBindAddress, "Bind address for admin connections. If empty, we will bind to all interfaces.")
flags.BoolVar(&o.EnableProfiling, "enable-profiling", o.EnableProfiling, "enable pprof at host:admin-port/debug/pprof")
flags.BoolVar(&o.EnableContentionProfiling, "enable-contention-profiling", o.EnableContentionProfiling, "enable contention profiling at host:admin-port/debug/pprof/block. \"--enable-profiling\" must also be set.")
flags.StringVar(&o.AgentID, "agent-id", o.AgentID, "The unique ID of this agent. Can also be set by the 'PROXY_AGENT_ID' environment variable. Default to a generated uuid if not set.")
flags.DurationVar(&o.SyncInterval, "sync-interval", o.SyncInterval, "The initial interval by which the agent periodically checks if it has connections to all instances of the proxy server.")
flags.DurationVar(&o.ProbeInterval, "probe-interval", o.ProbeInterval, "The interval by which the agent periodically checks if its connections to the proxy server are ready.")
flags.DurationVar(&o.SyncIntervalCap, "sync-interval-cap", o.SyncIntervalCap, "The maximum interval for the SyncInterval to back off to when unable to connect to the proxy server")
flags.DurationVar(&o.KeepaliveTime, "keepalive-time", o.KeepaliveTime, "Time for gRPC agent server keepalive.")
flags.StringVar(&o.ServiceAccountTokenPath, "service-account-token-path", o.ServiceAccountTokenPath, "If non-empty proxy agent uses this token to prove its identity to the proxy server.")
flags.StringVar(&o.AgentIdentifiers, "agent-identifiers", o.AgentIdentifiers, "Identifiers of the agent that will be used by the server when choosing agent. N.B. the list of identifiers must be in URL encoded format. e.g.,host=localhost&host=node1.mydomain.com&cidr=127.0.0.1/16&ipv4=1.2.3.4&ipv4=5.6.7.8&ipv6=:::::&default-route=true")
flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.")
flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.")
flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.")
flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.")
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "Namespace where lease objects are managed.")
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
flags.StringVar(&o.ServerCountSource, "server-count-source", o.ServerCountSource, "Defines how the server counts from lease and from server responses are combined. Possible values: 'default' to use only one source (server or leases depending on other flags), 'max' to take the larger value.")
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file")
flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.")
return flags
}
func (o *GrpcProxyAgentOptions) Print() {
klog.V(1).Infof("AgentCert set to %q.\n", o.AgentCert)
klog.V(1).Infof("AgentKey set to %q.\n", o.AgentKey)
klog.V(1).Infof("CACert set to %q.\n", o.CaCert)
klog.V(1).Infof("ProxyServerHost set to %q.\n", o.ProxyServerHost)
klog.V(1).Infof("ProxyServerPort set to %d.\n", o.ProxyServerPort)
klog.V(1).Infof("ALPNProtos set to %+s.\n", o.AlpnProtos)
klog.V(1).Infof("HealthServerHost set to %s\n", o.HealthServerHost)
klog.V(1).Infof("HealthServerPort set to %d.\n", o.HealthServerPort)
klog.V(1).Infof("Admin bind address set to %q.\n", o.AdminBindAddress)
klog.V(1).Infof("AdminServerPort set to %d.\n", o.AdminServerPort)
klog.V(1).Infof("EnableProfiling set to %v.\n", o.EnableProfiling)
klog.V(1).Infof("EnableContentionProfiling set to %v.\n", o.EnableContentionProfiling)
klog.V(1).Infof("AgentID set to %s.\n", o.AgentID)
klog.V(1).Infof("SyncInterval set to %v.\n", o.SyncInterval)
klog.V(1).Infof("ProbeInterval set to %v.\n", o.ProbeInterval)
klog.V(1).Infof("SyncIntervalCap set to %v.\n", o.SyncIntervalCap)
klog.V(1).Infof("Keepalive time set to %v.\n", o.KeepaliveTime)
klog.V(1).Infof("ServiceAccountTokenPath set to %q.\n", o.ServiceAccountTokenPath)
klog.V(1).Infof("AgentIdentifiers set to %s.\n", util.PrettyPrintURL(o.AgentIdentifiers))
klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit)
klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever)
klog.V(1).Infof("CountServerLeases set to %v.\n", o.CountServerLeases)
klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace)
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
klog.V(1).Infof("ServerCountSource set to %s.\n", o.ServerCountSource)
klog.V(1).Infof("ChannelSize set to %d.\n", o.XfrChannelSize)
klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType)
}
func (o *GrpcProxyAgentOptions) Validate() error {
if o.AgentKey != "" {
if _, err := os.Stat(o.AgentKey); os.IsNotExist(err) {
return fmt.Errorf("error checking agent key %s, got %v", o.AgentKey, err)
}
if o.AgentCert == "" {
return fmt.Errorf("cannot have agent cert empty when agent key is set to \"%s\"", o.AgentKey)
}
}
if o.AgentCert != "" {
if _, err := os.Stat(o.AgentCert); os.IsNotExist(err) {
return fmt.Errorf("error checking agent cert %s, got %v", o.AgentCert, err)
}
if o.AgentKey == "" {
return fmt.Errorf("cannot have agent key empty when agent cert is set to \"%s\"", o.AgentCert)
}
}
if o.CaCert != "" {
if _, err := os.Stat(o.CaCert); os.IsNotExist(err) {
return fmt.Errorf("error checking agent CA cert %s, got %v", o.CaCert, err)
}
}
if o.ProxyServerPort <= 0 {
return fmt.Errorf("proxy server port %d must be greater than 0", o.ProxyServerPort)
}
if o.HealthServerPort <= 0 {
return fmt.Errorf("health server port %d must be greater than 0", o.HealthServerPort)
}
if o.AdminServerPort <= 0 {
return fmt.Errorf("admin server port %d must be greater than 0", o.AdminServerPort)
}
if o.XfrChannelSize <= 0 {
return fmt.Errorf("channel size %d must be greater than 0", o.XfrChannelSize)
}
if o.EnableContentionProfiling && !o.EnableProfiling {
return fmt.Errorf("if --enable-contention-profiling is set, --enable-profiling must also be set")
}
if o.SyncInterval > o.SyncIntervalCap {
return fmt.Errorf("sync interval %v must be less than sync interval cap %v", o.SyncInterval, o.SyncIntervalCap)
}
if o.ServiceAccountTokenPath != "" {
if _, err := os.Stat(o.ServiceAccountTokenPath); os.IsNotExist(err) {
return fmt.Errorf("error checking service account token path %s, got %v", o.ServiceAccountTokenPath, err)
}
}
if err := validateAgentIdentifiers(o.AgentIdentifiers); err != nil {
return fmt.Errorf("agent address is invalid: %v", err)
}
if o.KubeconfigPath != "" {
if _, err := os.Stat(o.KubeconfigPath); os.IsNotExist(err) {
return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err)
}
}
// Validate labels provided.
if o.CountServerLeases {
_, err := util.ParseLabels(o.LeaseLabel)
if err != nil {
return err
}
}
if o.ServerCountSource != "" {
if o.ServerCountSource != "default" && o.ServerCountSource != "max" {
return fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got %v", o.ServerCountSource)
}
}
return nil
}
func validateAgentIdentifiers(agentIdentifiers string) error {
decoded, err := url.ParseQuery(agentIdentifiers)
if err != nil {
return err
}
for idType := range decoded {
switch header.IdentifierType(idType) {
case header.IPv4:
case header.IPv6:
case header.CIDR:
case header.Host:
case header.DefaultRoute:
default:
return fmt.Errorf("unknown address type: %s", idType)
}
}
return nil
}
func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
o := GrpcProxyAgentOptions{
AgentCert: "",
AgentKey: "",
CaCert: "",
ProxyServerHost: "127.0.0.1",
ProxyServerPort: 8091,
HealthServerHost: "",
HealthServerPort: 8093,
AdminBindAddress: "127.0.0.1",
AdminServerPort: 8094,
EnableProfiling: false,
EnableContentionProfiling: false,
AgentID: defaultAgentID(),
AgentIdentifiers: "",
SyncInterval: 1 * time.Second,
ProbeInterval: 1 * time.Second,
SyncIntervalCap: 10 * time.Second,
KeepaliveTime: 1 * time.Hour,
ServiceAccountTokenPath: "",
WarnOnChannelLimit: false,
SyncForever: false,
XfrChannelSize: 150,
CountServerLeases: false,
LeaseNamespace: "kube-system",
LeaseLabel: "k8s-app=konnectivity-server",
ServerCountSource: "default",
KubeconfigPath: "",
APIContentType: runtime.ContentTypeProtobuf,
}
return &o
}
func defaultAgentID() string {
// Default to the value set by the PROXY_AGENT_ID environment variable. If both the flag &
// environment variable are set, the flag always wins.
if id := os.Getenv("PROXY_AGENT_ID"); id != "" {
return id
}
return uuid.New().String()
}
|