File: policy_source.go

package info (click to toggle)
golang-k8s-apiserver 0.33.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 11,660 kB
  • sloc: sh: 236; makefile: 5
file content (493 lines) | stat: -rw-r--r-- 16,230 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package generic

import (
	"context"
	goerrors "errors"
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/meta"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/apimachinery/pkg/types"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/apiserver/pkg/admission/plugin/policy/internal/generic"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/dynamic/dynamicinformer"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/tools/cache"
	"k8s.io/klog/v2"
)

// Interval for refreshing policies.
// TODO: Consider reducing this to a shorter duration or replacing this entirely
// with checks that detect when a policy change took effect.
const policyRefreshIntervalDefault = 1 * time.Second

var policyRefreshInterval = policyRefreshIntervalDefault

type policySource[P runtime.Object, B runtime.Object, E Evaluator] struct {
	ctx                context.Context
	policyInformer     generic.Informer[P]
	bindingInformer    generic.Informer[B]
	restMapper         meta.RESTMapper
	newPolicyAccessor  func(P) PolicyAccessor
	newBindingAccessor func(B) BindingAccessor

	informerFactory informers.SharedInformerFactory
	dynamicClient   dynamic.Interface

	compiler func(P) E

	// Currently compiled list of valid/active policy-binding pairs
	policies atomic.Pointer[[]PolicyHook[P, B, E]]
	// Whether the cache of policies is dirty and needs to be recompiled
	policiesDirty atomic.Bool

	lock             sync.Mutex
	compiledPolicies map[types.NamespacedName]compiledPolicyEntry[E]

	// Temporary until we use the dynamic informer factory
	paramsCRDControllers map[schema.GroupVersionKind]*paramInfo
}

type paramInfo struct {
	mapping meta.RESTMapping

	// When the param is changed, or the informer is done being used, the cancel
	// func should be called to stop/cleanup the original informer
	cancelFunc func()

	// The lister for this param
	informer informers.GenericInformer
}

type compiledPolicyEntry[E Evaluator] struct {
	policyVersion string
	evaluator     E
}

type PolicyHook[P runtime.Object, B runtime.Object, E Evaluator] struct {
	Policy   P
	Bindings []B

	// ParamInformer is the informer for the param CRD for this policy, or nil if
	// there is no param or if there was a configuration error
	ParamInformer informers.GenericInformer
	ParamScope    meta.RESTScope

	Evaluator          E
	ConfigurationError error
}

var _ Source[PolicyHook[runtime.Object, runtime.Object, Evaluator]] = &policySource[runtime.Object, runtime.Object, Evaluator]{}

func NewPolicySource[P runtime.Object, B runtime.Object, E Evaluator](
	policyInformer cache.SharedIndexInformer,
	bindingInformer cache.SharedIndexInformer,
	newPolicyAccessor func(P) PolicyAccessor,
	newBindingAccessor func(B) BindingAccessor,
	compiler func(P) E,
	paramInformerFactory informers.SharedInformerFactory,
	dynamicClient dynamic.Interface,
	restMapper meta.RESTMapper,
) Source[PolicyHook[P, B, E]] {
	res := &policySource[P, B, E]{
		compiler:             compiler,
		policyInformer:       generic.NewInformer[P](policyInformer),
		bindingInformer:      generic.NewInformer[B](bindingInformer),
		compiledPolicies:     map[types.NamespacedName]compiledPolicyEntry[E]{},
		newPolicyAccessor:    newPolicyAccessor,
		newBindingAccessor:   newBindingAccessor,
		paramsCRDControllers: map[schema.GroupVersionKind]*paramInfo{},
		informerFactory:      paramInformerFactory,
		dynamicClient:        dynamicClient,
		restMapper:           restMapper,
	}
	return res
}

// SetPolicyRefreshIntervalForTests allows the refresh interval to be overridden during tests.
// This should only be called from tests.
func SetPolicyRefreshIntervalForTests(interval time.Duration) func() {
	policyRefreshInterval = interval
	return func() {
		policyRefreshInterval = policyRefreshIntervalDefault
	}
}

func (s *policySource[P, B, E]) Run(ctx context.Context) error {
	if s.ctx != nil {
		return fmt.Errorf("policy source already running")
	}

	// Wait for initial cache sync of policies and informers before reconciling
	// any
	if !cache.WaitForNamedCacheSync(fmt.Sprintf("%T", s), ctx.Done(), s.UpstreamHasSynced) {
		err := ctx.Err()
		if err == nil {
			err = fmt.Errorf("initial cache sync for %T failed", s)
		}
		return err
	}

	s.ctx = ctx

	// Perform initial policy compilation after initial list has finished
	s.notify()
	s.refreshPolicies()

	notifyFuncs := cache.ResourceEventHandlerFuncs{
		AddFunc: func(_ interface{}) {
			s.notify()
		},
		UpdateFunc: func(_, _ interface{}) {
			s.notify()
		},
		DeleteFunc: func(_ interface{}) {
			s.notify()
		},
	}
	handle, err := s.policyInformer.AddEventHandler(notifyFuncs)
	if err != nil {
		return err
	}
	defer func() {
		if err := s.policyInformer.RemoveEventHandler(handle); err != nil {
			utilruntime.HandleError(fmt.Errorf("failed to remove policy event handler: %w", err))
		}
	}()

	bindingHandle, err := s.bindingInformer.AddEventHandler(notifyFuncs)
	if err != nil {
		return err
	}
	defer func() {
		if err := s.bindingInformer.RemoveEventHandler(bindingHandle); err != nil {
			utilruntime.HandleError(fmt.Errorf("failed to remove binding event handler: %w", err))
		}
	}()

	// Start a worker that checks every second to see if policy data is dirty
	// and needs to be recompiled
	go func() {
		// Loop every 1 second until context is cancelled, refreshing policies
		wait.Until(s.refreshPolicies, policyRefreshInterval, ctx.Done())
	}()

	<-ctx.Done()
	return nil
}

func (s *policySource[P, B, E]) UpstreamHasSynced() bool {
	return s.policyInformer.HasSynced() && s.bindingInformer.HasSynced()
}

// HasSynced implements Source.
func (s *policySource[P, B, E]) HasSynced() bool {
	// As an invariant we never store `nil` into the atomic list of processed
	// policy hooks. If it is nil, then we haven't compiled all the policies
	// and stored them yet.
	return s.Hooks() != nil
}

// Hooks implements Source.
func (s *policySource[P, B, E]) Hooks() []PolicyHook[P, B, E] {
	res := s.policies.Load()

	// Error case should not happen since evaluation function never
	// returns error
	if res == nil {
		// Not yet synced
		return nil
	}

	return *res
}

func (s *policySource[P, B, E]) refreshPolicies() {
	if !s.UpstreamHasSynced() {
		return
	} else if !s.policiesDirty.Swap(false) {
		return
	}

	// It is ok the cache gets marked dirty again between us clearing the
	// flag and us calculating the policies. The dirty flag would be marked again,
	// and we'd have a no-op after comparing resource versions on the next sync.
	klog.Infof("refreshing policies")
	policies, err := s.calculatePolicyData()

	// Intentionally store policy list regardless of error. There may be
	// an error returned if there was a configuration error in one of the policies,
	// but we would still want those policies evaluated
	// (for instance to return error on failaction). Or if there was an error
	// listing all policies at all, we would want to wipe the list.
	s.policies.Store(&policies)

	if err != nil {
		// An error was generated while syncing policies. Mark it as dirty again
		// so we can retry later
		utilruntime.HandleError(fmt.Errorf("encountered error syncing policies: %w. Rescheduling policy sync", err))
		s.notify()
	}
}

func (s *policySource[P, B, E]) notify() {
	s.policiesDirty.Store(true)
}

// calculatePolicyData calculates the list of policies and bindings for each
// policy. If there is an error in generation, it will return the error and
// the partial list of policies that were able to be generated. Policies that
// have an error will have a non-nil ConfigurationError field, but still be
// included in the result.
//
// This function caches the result of the intermediate compilations
func (s *policySource[P, B, E]) calculatePolicyData() ([]PolicyHook[P, B, E], error) {
	if !s.UpstreamHasSynced() {
		return nil, fmt.Errorf("cannot calculate policy data until upstream has synced")
	}

	// Fat-fingered lock that can be made more fine-tuned if required
	s.lock.Lock()
	defer s.lock.Unlock()

	// Create a local copy of all policies and bindings
	policiesToBindings := map[types.NamespacedName][]B{}
	bindingList, err := s.bindingInformer.List(labels.Everything())
	if err != nil {
		// This should never happen unless types are misconfigured
		// (can't use meta.accessor on them)
		return nil, err
	}

	// Gather a list of all active policy bindings
	for _, bindingSpec := range bindingList {
		bindingAccessor := s.newBindingAccessor(bindingSpec)
		policyKey := bindingAccessor.GetPolicyName()

		// Add this binding to the list of bindings for this policy
		policiesToBindings[policyKey] = append(policiesToBindings[policyKey], bindingSpec)
	}

	result := make([]PolicyHook[P, B, E], 0, len(bindingList))
	usedParams := map[schema.GroupVersionKind]struct{}{}
	var errs []error
	for policyKey, bindingSpecs := range policiesToBindings {
		var inf generic.NamespacedLister[P] = s.policyInformer
		if len(policyKey.Namespace) > 0 {
			inf = s.policyInformer.Namespaced(policyKey.Namespace)
		}
		policySpec, err := inf.Get(policyKey.Name)
		if errors.IsNotFound(err) {
			// Policy for bindings doesn't exist. This can happen if the policy
			// was deleted before the binding, or the binding was created first.
			//
			// Just skip bindings that refer to non-existent policies
			// If the policy is recreated, the cache will be marked dirty and
			// this function will run again.
			continue
		} else if err != nil {
			// This should never happen since fetching from a cache should never
			// fail and this function checks that the cache was synced before
			// even getting to this point.
			errs = append(errs, err)
			continue
		}

		var parsedParamKind *schema.GroupVersionKind
		policyAccessor := s.newPolicyAccessor(policySpec)

		if paramKind := policyAccessor.GetParamKind(); paramKind != nil {
			groupVersion, err := schema.ParseGroupVersion(paramKind.APIVersion)
			if err != nil {
				errs = append(errs, fmt.Errorf("failed to parse paramKind APIVersion: %w", err))
				continue
			}
			parsedParamKind = &schema.GroupVersionKind{
				Group:   groupVersion.Group,
				Version: groupVersion.Version,
				Kind:    paramKind.Kind,
			}

			// TEMPORARY UNTIL WE HAVE SHARED PARAM INFORMERS
			usedParams[*parsedParamKind] = struct{}{}
		}

		paramInformer, paramScope, configurationError := s.ensureParamsForPolicyLocked(parsedParamKind)
		result = append(result, PolicyHook[P, B, E]{
			Policy:             policySpec,
			Bindings:           bindingSpecs,
			Evaluator:          s.compilePolicyLocked(policySpec),
			ParamInformer:      paramInformer,
			ParamScope:         paramScope,
			ConfigurationError: configurationError,
		})

		// Should queue a re-sync for policy sync error. If our shared param
		// informer can notify us when CRD discovery changes we can remove this
		// and just rely on the informer to notify us when the CRDs change
		if configurationError != nil {
			errs = append(errs, configurationError)
		}
	}

	// Clean up orphaned policies by replacing the old cache of compiled policies
	// (the map of used policies is updated by `compilePolicy`)
	for policyKey := range s.compiledPolicies {
		if _, wasSeen := policiesToBindings[policyKey]; !wasSeen {
			delete(s.compiledPolicies, policyKey)
		}
	}

	// Clean up orphaned param informers
	for paramKind, info := range s.paramsCRDControllers {
		if _, wasSeen := usedParams[paramKind]; !wasSeen {
			info.cancelFunc()
			delete(s.paramsCRDControllers, paramKind)
		}
	}

	err = nil
	if len(errs) > 0 {
		err = goerrors.Join(errs...)
	}
	return result, err
}

// ensureParamsForPolicyLocked ensures that the informer for the paramKind is
// started and returns the informer and the scope of the paramKind.
//
// Must be called under write lock
func (s *policySource[P, B, E]) ensureParamsForPolicyLocked(paramSource *schema.GroupVersionKind) (informers.GenericInformer, meta.RESTScope, error) {
	if paramSource == nil {
		return nil, nil, nil
	} else if info, ok := s.paramsCRDControllers[*paramSource]; ok {
		return info.informer, info.mapping.Scope, nil
	}

	mapping, err := s.restMapper.RESTMapping(schema.GroupKind{
		Group: paramSource.Group,
		Kind:  paramSource.Kind,
	}, paramSource.Version)

	if err != nil {
		// Failed to resolve. Return error so we retry again (rate limited)
		// Save a record of this definition with an evaluator that unconditionally
		return nil, nil, fmt.Errorf("failed to find resource referenced by paramKind: '%v'", *paramSource)
	}

	// We are not watching this param. Start an informer for it.
	instanceContext, instanceCancel := context.WithCancel(s.ctx)

	var informer informers.GenericInformer

	// Try to see if our provided informer factory has an informer for this type.
	// We assume the informer is already started, and starts all types associated
	// with it.
	if genericInformer, err := s.informerFactory.ForResource(mapping.Resource); err == nil {
		informer = genericInformer

		// Start the informer
		s.informerFactory.Start(instanceContext.Done())

	} else {
		// Dynamic JSON informer fallback.
		// Cannot use shared dynamic informer since it would be impossible
		// to clean CRD informers properly with multiple dependents
		// (cannot start ahead of time, and cannot track dependencies via stopCh)
		informer = dynamicinformer.NewFilteredDynamicInformer(
			s.dynamicClient,
			mapping.Resource,
			corev1.NamespaceAll,
			// Use same interval as is used for k8s typed sharedInformerFactory
			// https://github.com/kubernetes/kubernetes/blob/7e0923899fed622efbc8679cca6b000d43633e38/cmd/kube-apiserver/app/server.go#L430
			10*time.Minute,
			cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
			nil,
		)
		go informer.Informer().Run(instanceContext.Done())
	}

	klog.Infof("informer started for %v", *paramSource)
	ret := &paramInfo{
		mapping:    *mapping,
		cancelFunc: instanceCancel,
		informer:   informer,
	}
	s.paramsCRDControllers[*paramSource] = ret
	return ret.informer, mapping.Scope, nil
}

// For testing
func (s *policySource[P, B, E]) getParamInformer(param schema.GroupVersionKind) (informers.GenericInformer, meta.RESTScope) {
	s.lock.Lock()
	defer s.lock.Unlock()

	if info, ok := s.paramsCRDControllers[param]; ok {
		return info.informer, info.mapping.Scope
	}

	return nil, nil
}

// compilePolicyLocked compiles the policy and returns the evaluator for it.
// If the policy has not changed since the last compilation, it will return
// the cached evaluator.
//
// Must be called under write lock
func (s *policySource[P, B, E]) compilePolicyLocked(policySpec P) E {
	policyMeta, err := meta.Accessor(policySpec)
	if err != nil {
		// This should not happen if P, and B have ObjectMeta, but
		// unfortunately there is no way to express "able to call
		// meta.Accessor" as a type constraint
		utilruntime.HandleError(err)
		var emptyEvaluator E
		return emptyEvaluator
	}
	key := types.NamespacedName{
		Namespace: policyMeta.GetNamespace(),
		Name:      policyMeta.GetName(),
	}

	compiledPolicy, wasCompiled := s.compiledPolicies[key]

	// If the policy or binding has changed since it was last compiled,
	// and if there is no configuration error (like a missing param CRD)
	// then we recompile
	if !wasCompiled ||
		compiledPolicy.policyVersion != policyMeta.GetResourceVersion() {

		compiledPolicy = compiledPolicyEntry[E]{
			policyVersion: policyMeta.GetResourceVersion(),
			evaluator:     s.compiler(policySpec),
		}
		s.compiledPolicies[key] = compiledPolicy
	}

	return compiledPolicy.evaluator
}