1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
package agent
import (
"context"
"sync"
"k8s.io/apimachinery/pkg/util/wait"
)
type errorDetailsTracker struct {
mx sync.Mutex
store map[errorTrackerKey]operationState
wg wait.Group
}
type errorTrackerKey struct {
name string
namespace string
}
// operationState indicates the state of an async operation as it is being watched by the errorDetailsTracker.
type operationState struct {
// version indicates the version of the operation for which the state is being tracked
version uint64
// errorDetails contains error details available at the end of an async operation. It will be nil if the operation has yet to finish execution
errorDetails *ErrorDetails
}
func newErrorDetailsTracker() *errorDetailsTracker {
return &errorDetailsTracker{store: make(map[errorTrackerKey]operationState)}
}
func (t *errorDetailsTracker) deleteErrorIfVersion(name string, namespace string, version uint64) {
key := errorTrackerKey{
name: name,
namespace: namespace,
}
t.mx.Lock()
defer t.mx.Unlock()
existingState, exists := t.store[key]
if exists && existingState.version == version {
// only delete entries if they correspond to the version passed in the
// function call. This check prevents cases where entries for a more
// recent operation(with a higher version) may be overwritten with
// error details of an older operation(with a lower version)
delete(t.store, key)
}
}
// saveErrorIfVersion will record error details only if the provided version
// matches the version in an existing record. In every other case, nothing
// will be written
func (t *errorDetailsTracker) saveErrorIfVersion(name string, namespace string, errorDetails *ErrorDetails, version uint64) {
key := errorTrackerKey{
name: name,
namespace: namespace,
}
t.mx.Lock()
defer t.mx.Unlock()
existingState, exists := t.store[key]
if !exists {
// Do NOT write anything if nothing exists
return
}
if existingState.version != version {
// this check is added so to prevent overwrite of errors
// for entries with mismatched versions
return
}
t.store[key] = operationState{
version: existingState.version,
errorDetails: errorDetails,
}
}
// watchForLatestErrors will watch the provided non-nil channel for error details and record them asynchronously. If multiple watches are created for
// different versions for the same workspace & namespace, only the error details corresponding to the latest version are tracked while entries for earlier
// versions are discarded. If multiple values are published in the channel, only the first error detail will be recorded. The caller is responsible for ensuring that at-most one value is published on the passed channel
func (t *errorDetailsTracker) watchForLatestErrors(ctx context.Context, name string, namespace string, version uint64, asyncErrDetails <-chan *ErrorDetails) {
t.markEntryWithVersion(name, namespace, version)
t.wg.StartWithContext(ctx, func(ctx context.Context) {
errDetails := <-asyncErrDetails
if errDetails != nil {
// at least one error details was received on the channel, so it must be saved
t.saveErrorIfVersion(name, namespace, errDetails, version)
} else {
// either nil value or nothing was received on the channel and so the
// entry for provided version can be safely evicted
t.deleteErrorIfVersion(name, namespace, version)
}
})
}
// waitForErrors waits on existing channels being watched to finish publishing error details(if any)
func (t *errorDetailsTracker) waitForErrors() {
t.wg.Wait()
}
// markEntryWithVersion will create an entry in the store if and only if
// no entry exists for the particular key OR an entry exists with an older version.
// If the version of an existing entry is higher (not equal) than the version passed in the function
// call, then the writes are skipped and nothing is updated
func (t *errorDetailsTracker) markEntryWithVersion(workspace string, namespace string, version uint64) {
key := errorTrackerKey{
name: workspace,
namespace: namespace,
}
t.mx.Lock()
defer t.mx.Unlock()
existingState, exists := t.store[key]
if exists && existingState.version > version {
// writes should be skipped here as an older version MUST not
// overwrite the entries corresponding to a newer version
return
}
t.store[key] = operationState{
version: version,
errorDetails: nil,
}
}
func (t *errorDetailsTracker) createSnapshot() map[errorTrackerKey]operationState {
snapshot := make(map[errorTrackerKey]operationState)
t.mx.Lock()
defer t.mx.Unlock()
for key, state := range t.store {
snapshot[key] = state
}
return snapshot
}
|