1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package agent
import (
"context"
"errors"
"time"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"sigs.k8s.io/apiserver-network-proxy/pkg/agent/metrics"
"sigs.k8s.io/apiserver-network-proxy/pkg/util"
coordinationv1api "k8s.io/api/coordination/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
coordinationv1lister "k8s.io/client-go/listers/coordination/v1"
)
type ServerCounter interface {
Count() int
}
// A ServerLeaseCounter counts leases in the k8s apiserver to determine the
// current proxy server count.
type ServerLeaseCounter struct {
leaseLister coordinationv1lister.LeaseLister
selector labels.Selector
fallbackCount int
pc clock.PassiveClock
}
// NewServerLeaseCounter creates a server counter that counts valid leases that match the label
// selector and provides the fallback count (initially 0) if this fails.
func NewServerLeaseCounter(pc clock.PassiveClock, leaseLister coordinationv1lister.LeaseLister, labelSelector labels.Selector) *ServerLeaseCounter {
return &ServerLeaseCounter{
leaseLister: leaseLister,
selector: labelSelector,
fallbackCount: 1,
pc: pc,
}
}
// Count counts the number of leases in the apiserver matching the provided
// label selector.
//
// In the event that no valid leases are found or lease listing fails, the
// fallback count is returned. This fallback count is updated upon successful
// discovery of valid leases.
func (lc *ServerLeaseCounter) Count() int {
leases, err := lc.leaseLister.List(lc.selector)
if err != nil {
klog.Errorf("Could not list leases to update server count, using fallback count (%v): %v", lc.fallbackCount, err)
return lc.fallbackCount
}
count := 0
for _, lease := range leases {
if util.IsLeaseValid(lc.pc, *lease) {
count++
}
}
// Ensure returned count is always at least 1.
if count == 0 {
klog.Warningf("No valid leases found, assuming server count of 1")
count = 1
}
if count != lc.fallbackCount {
lc.fallbackCount = count
}
return count
}
func NewLeaseInformerWithMetrics(client kubernetes.Interface, namespace string, resyncPeriod time.Duration) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
start := time.Now()
httpCode := 200
defer func() {
latency := time.Now().Sub(start)
metrics.Metrics.ObserveLeaseListLatency(latency, httpCode)
}()
obj, err := client.CoordinationV1().Leases(namespace).List(context.TODO(), options)
if err != nil {
klog.Errorf("Could not list leases: %v", err)
var apiStatus apierrors.APIStatus
if errors.As(err, &apiStatus) {
status := apiStatus.Status()
httpCode = int(status.Code)
metrics.Metrics.ObserveLeaseList(int(status.Code), string(status.Reason))
} else {
klog.Errorf("Lease list error could not be logged to metrics as it is not an APIStatus: %v", err)
}
return nil, err
}
metrics.Metrics.ObserveLeaseList(200, "")
return obj, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
obj, err := client.CoordinationV1().Leases(namespace).Watch(context.TODO(), options)
if err != nil {
klog.Errorf("Could not watch leases: %v", err)
var apiStatus apierrors.APIStatus
if errors.As(err, &apiStatus) {
status := apiStatus.Status()
metrics.Metrics.ObserveLeaseWatch(int(status.Code), string(status.Reason))
} else {
klog.Errorf("Lease watch error could not be logged to metrics as it is not an APIStatus: %v", err)
}
return nil, err
}
metrics.Metrics.ObserveLeaseWatch(200, "")
return obj, nil
},
},
&coordinationv1api.Lease{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
}
|