File: lease_counter.go

package info (click to toggle)
golang-k8s-sigs-apiserver-network-proxy 0.33.0%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,068 kB
  • sloc: makefile: 220; sh: 118
file content (148 lines) | stat: -rw-r--r-- 4,622 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

	http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package agent

import (
	"context"
	"errors"
	"time"

	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/watch"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/klog/v2"
	"k8s.io/utils/clock"
	"sigs.k8s.io/apiserver-network-proxy/pkg/agent/metrics"
	"sigs.k8s.io/apiserver-network-proxy/pkg/util"

	coordinationv1api "k8s.io/api/coordination/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	coordinationv1lister "k8s.io/client-go/listers/coordination/v1"
)

type ServerCounter interface {
	Count() int
}

// A ServerLeaseCounter counts leases in the k8s apiserver to determine the
// current proxy server count.
type ServerLeaseCounter struct {
	leaseLister   coordinationv1lister.LeaseLister
	selector      labels.Selector
	fallbackCount int
	pc            clock.PassiveClock
}

// NewServerLeaseCounter creates a server counter that counts valid leases that match the label
// selector and provides the fallback count (initially 0) if this fails.
func NewServerLeaseCounter(pc clock.PassiveClock, leaseLister coordinationv1lister.LeaseLister, labelSelector labels.Selector) *ServerLeaseCounter {
	return &ServerLeaseCounter{
		leaseLister:   leaseLister,
		selector:      labelSelector,
		fallbackCount: 1,
		pc:            pc,
	}
}

// Count counts the number of leases in the apiserver matching the provided
// label selector.
//
// In the event that no valid leases are found or lease listing fails, the
// fallback count is returned. This fallback count is updated upon successful
// discovery of valid leases.
func (lc *ServerLeaseCounter) Count() int {
	leases, err := lc.leaseLister.List(lc.selector)
	if err != nil {
		klog.Errorf("Could not list leases to update server count, using fallback count (%v): %v", lc.fallbackCount, err)

		return lc.fallbackCount
	}

	count := 0
	for _, lease := range leases {
		if util.IsLeaseValid(lc.pc, *lease) {
			count++
		}
	}

	// Ensure returned count is always at least 1.
	if count == 0 {
		klog.Warningf("No valid leases found, assuming server count of 1")
		count = 1
	}

	if count != lc.fallbackCount {
		lc.fallbackCount = count
	}

	return count
}

func NewLeaseInformerWithMetrics(client kubernetes.Interface, namespace string, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				start := time.Now()
				httpCode := 200
				defer func() {
					latency := time.Now().Sub(start)
					metrics.Metrics.ObserveLeaseListLatency(latency, httpCode)
				}()
				obj, err := client.CoordinationV1().Leases(namespace).List(context.TODO(), options)
				if err != nil {
					klog.Errorf("Could not list leases: %v", err)

					var apiStatus apierrors.APIStatus
					if errors.As(err, &apiStatus) {
						status := apiStatus.Status()
						httpCode = int(status.Code)
						metrics.Metrics.ObserveLeaseList(int(status.Code), string(status.Reason))
					} else {
						klog.Errorf("Lease list error could not be logged to metrics as it is not an APIStatus: %v", err)
					}
					return nil, err
				}

				metrics.Metrics.ObserveLeaseList(200, "")
				return obj, nil
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				obj, err := client.CoordinationV1().Leases(namespace).Watch(context.TODO(), options)
				if err != nil {
					klog.Errorf("Could not watch leases: %v", err)

					var apiStatus apierrors.APIStatus
					if errors.As(err, &apiStatus) {
						status := apiStatus.Status()
						metrics.Metrics.ObserveLeaseWatch(int(status.Code), string(status.Reason))
					} else {
						klog.Errorf("Lease watch error could not be logged to metrics as it is not an APIStatus: %v", err)
					}
					return nil, err
				}

				metrics.Metrics.ObserveLeaseWatch(200, "")
				return obj, nil
			},
		},
		&coordinationv1api.Lease{},
		resyncPeriod,
		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
	)
}