File: the_real_fifo.go

package info (click to toggle)
golang-k8s-client-go 0.33.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 19,780 kB
  • sloc: makefile: 8; sh: 3
file content (407 lines) | stat: -rw-r--r-- 12,901 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
	"fmt"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/sets"
	utiltrace "k8s.io/utils/trace"
	"sync"
	"time"
)

// RealFIFO is a Queue in which every notification from the Reflector is passed
// in order to the Queue via Pop.
// This means that it
// 1. delivers notifications for items that have been deleted
// 2. delivers multiple notifications per item instead of simply the most recent value
type RealFIFO struct {
	lock sync.RWMutex
	cond sync.Cond

	items []Delta

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item insertion and retrieval, and
	// should be deterministic.
	keyFunc KeyFunc

	// knownObjects list keys that are "known" --- affecting Delete(),
	// Replace(), and Resync()
	knownObjects KeyListerGetter

	// Indication the queue is closed.
	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRUD operations.
	closed bool

	// Called with every object if non-nil.
	transformer TransformFunc
}

var (
	_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
)

// Close the queue.
func (f *RealFIFO) Close() {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.closed = true
	f.cond.Broadcast()
}

// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
// DeletedFinalStateUnknown objects.
func (f *RealFIFO) keyOf(obj interface{}) (string, error) {
	if d, ok := obj.(Deltas); ok {
		if len(d) == 0 {
			return "", KeyError{obj, ErrZeroLengthDeltasObject}
		}
		obj = d.Newest().Object
	}
	if d, ok := obj.(Delta); ok {
		obj = d.Object
	}
	if d, ok := obj.(DeletedFinalStateUnknown); ok {
		return d.Key, nil
	}
	return f.keyFunc(obj)
}

// HasSynced returns true if an Add/Update/Delete are called first,
// or the first batch of items inserted by Replace() has been popped.
func (f *RealFIFO) HasSynced() bool {
	f.lock.Lock()
	defer f.lock.Unlock()
	return f.hasSynced_locked()
}

// ignoring lint to reduce delta to the original for review.  It's ok adjust later.
//
//lint:file-ignore ST1003: should not use underscores in Go names
func (f *RealFIFO) hasSynced_locked() bool {
	return f.populated && f.initialPopulationCount == 0
}

// addToItems_locked appends to the delta list.
func (f *RealFIFO) addToItems_locked(deltaActionType DeltaType, skipTransform bool, obj interface{}) error {
	// we must be able to read the keys in order to determine whether the knownObjcts and the items
	// in this FIFO overlap
	_, err := f.keyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}

	// Every object comes through this code path once, so this is a good
	// place to call the transform func.
	//
	// If obj is a DeletedFinalStateUnknown tombstone or the action is a Sync,
	// then the object have already gone through the transformer.
	//
	// If the objects already present in the cache are passed to Replace(),
	// the transformer must be idempotent to avoid re-mutating them,
	// or coordinate with all readers from the cache to avoid data races.
	// Default informers do not pass existing objects to Replace.
	if f.transformer != nil {
		_, isTombstone := obj.(DeletedFinalStateUnknown)
		if !isTombstone && !skipTransform {
			var err error
			obj, err = f.transformer(obj)
			if err != nil {
				return err
			}
		}
	}

	f.items = append(f.items, Delta{
		Type:   deltaActionType,
		Object: obj,
	})
	f.cond.Broadcast()

	return nil
}

// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *RealFIFO) Add(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()

	f.populated = true
	retErr := f.addToItems_locked(Added, false, obj)

	return retErr
}

// Update is the same as Add in this implementation.
func (f *RealFIFO) Update(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()

	f.populated = true
	retErr := f.addToItems_locked(Updated, false, obj)

	return retErr
}

// Delete removes an item. It doesn't add it to the queue, because
// this implementation assumes the consumer only cares about the objects,
// not the order in which they were created/added.
func (f *RealFIFO) Delete(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()

	f.populated = true
	retErr := f.addToItems_locked(Deleted, false, obj)

	return retErr
}

// IsClosed checks if the queue is closed
func (f *RealFIFO) IsClosed() bool {
	f.lock.Lock()
	defer f.lock.Unlock()
	return f.closed
}

// Pop waits until an item is ready and processes it. If multiple items are
// ready, they are returned in the order in which they were added/updated.
// The item is removed from the queue (and the store) before it is processed.
// process function is called under lock, so it is safe
// update data structures in it that need to be in sync with the queue.
func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for len(f.items) == 0 {
		// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
		// When Close() is called, the f.closed is set and the condition is broadcasted.
		// Which causes this loop to continue and return from the Pop().
		if f.closed {
			return nil, ErrFIFOClosed
		}

		f.cond.Wait()
	}

	isInInitialList := !f.hasSynced_locked()
	item := f.items[0]
	// The underlying array still exists and references this object, so the object will not be garbage collected unless we zero the reference.
	f.items[0] = Delta{}
	f.items = f.items[1:]
	if f.initialPopulationCount > 0 {
		f.initialPopulationCount--
	}

	// Only log traces if the queue depth is greater than 10 and it takes more than
	// 100 milliseconds to process one item from the queue.
	// Queue depth never goes high because processing an item is locking the queue,
	// and new items can't be added until processing finish.
	// https://github.com/kubernetes/kubernetes/issues/103789
	if len(f.items) > 10 {
		id, _ := f.keyOf(item)
		trace := utiltrace.New("RealFIFO Pop Process",
			utiltrace.Field{Key: "ID", Value: id},
			utiltrace.Field{Key: "Depth", Value: len(f.items)},
			utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
		defer trace.LogIfLong(100 * time.Millisecond)
	}

	// we wrap in Deltas here to be compatible with preview Pop functions and those interpreting the return value.
	err := process(Deltas{item}, isInInitialList)
	return Deltas{item}, err
}

// Replace
// 1. finds those items in f.items that are not in newItems and creates synthetic deletes for them
// 2. finds items in knownObjects that are not in newItems and creates synthetic deletes for them
// 3. adds the newItems to the queue
func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error {
	f.lock.Lock()
	defer f.lock.Unlock()

	// determine the keys of everything we're adding.  We cannot add the items until after the synthetic deletes have been
	// created for items that don't existing in newItems
	newKeys := sets.Set[string]{}
	for _, obj := range newItems {
		key, err := f.keyOf(obj)
		if err != nil {
			return KeyError{obj, err}
		}
		newKeys.Insert(key)
	}

	queuedItems := f.items
	queuedKeys := []string{}
	lastQueuedItemForKey := map[string]Delta{}
	for _, queuedItem := range queuedItems {
		queuedKey, err := f.keyOf(queuedItem.Object)
		if err != nil {
			return KeyError{queuedItem.Object, err}
		}

		if _, seen := lastQueuedItemForKey[queuedKey]; !seen {
			queuedKeys = append(queuedKeys, queuedKey)
		}
		lastQueuedItemForKey[queuedKey] = queuedItem
	}

	// all the deletes already in the queue are important. There are two cases
	// 1. queuedItems has delete for key/X and newItems has replace for key/X.  This means the queued UID was deleted and a new one was created.
	// 2. queuedItems has a delete for key/X and newItems does NOT have key/X.  This means the queued item was deleted.
	// Do deletion detection against objects in the queue.
	for _, queuedKey := range queuedKeys {
		if newKeys.Has(queuedKey) {
			continue
		}

		// Delete pre-existing items not in the new list.
		// This could happen if watch deletion event was missed while
		// disconnected from apiserver.
		lastQueuedItem := lastQueuedItemForKey[queuedKey]
		// if we've already got the item marked as deleted, no need to add another delete
		if lastQueuedItem.Type == Deleted {
			continue
		}

		// if we got here, then the last entry we have for the queued item is *not* a deletion and we need to add a delete
		deletedObj := lastQueuedItem.Object

		retErr := f.addToItems_locked(Deleted, true, DeletedFinalStateUnknown{
			Key: queuedKey,
			Obj: deletedObj,
		})
		if retErr != nil {
			return fmt.Errorf("couldn't enqueue object: %w", retErr)
		}
	}

	// Detect deletions for objects not present in the queue, but present in KnownObjects
	knownKeys := f.knownObjects.ListKeys()
	for _, knownKey := range knownKeys {
		if newKeys.Has(knownKey) { // still present
			continue
		}
		if _, inQueuedItems := lastQueuedItemForKey[knownKey]; inQueuedItems { // already added delete for these
			continue
		}

		deletedObj, exists, err := f.knownObjects.GetByKey(knownKey)
		if err != nil {
			deletedObj = nil
			utilruntime.HandleError(fmt.Errorf("error during lookup, placing DeleteFinalStateUnknown marker without object: key=%q, err=%w", knownKey, err))
		} else if !exists {
			deletedObj = nil
			utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object: key=%q", knownKey))
		}
		retErr := f.addToItems_locked(Deleted, false, DeletedFinalStateUnknown{
			Key: knownKey,
			Obj: deletedObj,
		})
		if retErr != nil {
			return fmt.Errorf("couldn't enqueue object: %w", retErr)
		}
	}

	// now that we have the deletes we need for items, we can add the newItems to the items queue
	for _, obj := range newItems {
		retErr := f.addToItems_locked(Replaced, false, obj)
		if retErr != nil {
			return fmt.Errorf("couldn't enqueue object: %w", retErr)
		}
	}

	if !f.populated {
		f.populated = true
		f.initialPopulationCount = len(f.items)
	}

	return nil
}

// Resync will ensure that every object in the Store has its key in the queue.
// This should be a no-op, because that property is maintained by all operations.
func (f *RealFIFO) Resync() error {
	// TODO this cannot logically be done by the FIFO, it can only be done by the indexer
	f.lock.Lock()
	defer f.lock.Unlock()

	if f.knownObjects == nil {
		return nil
	}

	keysInQueue := sets.Set[string]{}
	for _, item := range f.items {
		key, err := f.keyOf(item.Object)
		if err != nil {
			return KeyError{item, err}
		}
		keysInQueue.Insert(key)
	}

	knownKeys := f.knownObjects.ListKeys()
	for _, knownKey := range knownKeys {
		// If we are doing Resync() and there is already an event queued for that object,
		// we ignore the Resync for it. This is to avoid the race, in which the resync
		// comes with the previous value of object (since queueing an event for the object
		// doesn't trigger changing the underlying store <knownObjects>.
		if keysInQueue.Has(knownKey) {
			continue
		}

		knownObj, exists, err := f.knownObjects.GetByKey(knownKey)
		if err != nil {
			utilruntime.HandleError(fmt.Errorf("unable to queue object for sync: key=%q, err=%w", knownKey, err))
			continue
		} else if !exists {
			utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, unable to queue object for sync: key=%q", knownKey))
			continue
		}

		retErr := f.addToItems_locked(Sync, true, knownObj)
		if retErr != nil {
			return fmt.Errorf("couldn't queue object: %w", err)
		}
	}

	return nil
}

// NewRealFIFO returns a Store which can be used to queue up items to
// process.
func NewRealFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter, transformer TransformFunc) *RealFIFO {
	if knownObjects == nil {
		panic("coding error: knownObjects must be provided")
	}

	f := &RealFIFO{
		items:        make([]Delta, 0, 10),
		keyFunc:      keyFunc,
		knownObjects: knownObjects,
		transformer:  transformer,
	}
	f.cond.L = &f.lock
	return f
}