File: error_details_tracker.go

package info (click to toggle)
gitlab-agent 16.11.5-1
  • links: PTS, VCS
  • area: contrib
  • in suites: experimental
  • size: 7,072 kB
  • sloc: makefile: 193; sh: 55; ruby: 3
file content (145 lines) | stat: -rw-r--r-- 4,657 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package agent

import (
	"context"
	"sync"

	"k8s.io/apimachinery/pkg/util/wait"
)

type errorDetailsTracker struct {
	mx    sync.Mutex
	store map[errorTrackerKey]operationState
	wg    wait.Group
}

type errorTrackerKey struct {
	name      string
	namespace string
}

// operationState indicates the state of an async operation as it is being watched by the errorDetailsTracker.
type operationState struct {
	// version indicates the version of the operation for which the state is being tracked
	version uint64

	// errorDetails contains error details available at the end of an async operation. It will be nil if the operation has yet to finish execution
	errorDetails *ErrorDetails
}

func newErrorDetailsTracker() *errorDetailsTracker {
	return &errorDetailsTracker{store: make(map[errorTrackerKey]operationState)}
}

func (t *errorDetailsTracker) deleteErrorIfVersion(name string, namespace string, version uint64) {
	key := errorTrackerKey{
		name:      name,
		namespace: namespace,
	}

	t.mx.Lock()
	defer t.mx.Unlock()

	existingState, exists := t.store[key]

	if exists && existingState.version == version {
		// only delete entries if they correspond to the version passed in the
		// function call. This check prevents cases where entries for a more
		// recent operation(with a higher version) may be overwritten with
		// error details of an older operation(with a lower version)
		delete(t.store, key)
	}
}

// saveErrorIfVersion will record error details only if the provided version
// matches the version in an existing record. In every other case, nothing
// will be written
func (t *errorDetailsTracker) saveErrorIfVersion(name string, namespace string, errorDetails *ErrorDetails, version uint64) {
	key := errorTrackerKey{
		name:      name,
		namespace: namespace,
	}

	t.mx.Lock()
	defer t.mx.Unlock()

	existingState, exists := t.store[key]
	if !exists {
		// Do NOT write anything if nothing exists
		return
	}

	if existingState.version != version {
		// this check is added so to prevent overwrite of errors
		// for entries with mismatched versions
		return
	}
	t.store[key] = operationState{
		version:      existingState.version,
		errorDetails: errorDetails,
	}
}

// watchForLatestErrors will watch the provided non-nil channel for error details and record them asynchronously. If multiple watches are created for
// different versions for the same workspace & namespace, only the error details corresponding to the latest version are tracked while entries for earlier
// versions are discarded. If multiple values are published in the channel, only the first error detail will be recorded. The caller is responsible for ensuring that at-most one value is published on the passed channel
func (t *errorDetailsTracker) watchForLatestErrors(ctx context.Context, name string, namespace string, version uint64, asyncErrDetails <-chan *ErrorDetails) {
	t.markEntryWithVersion(name, namespace, version)

	t.wg.StartWithContext(ctx, func(ctx context.Context) {
		errDetails := <-asyncErrDetails
		if errDetails != nil {
			// at least one error details was received on the channel, so it must be saved
			t.saveErrorIfVersion(name, namespace, errDetails, version)
		} else {
			// either nil value or nothing was received on the channel and so the
			// entry for provided version can be safely evicted
			t.deleteErrorIfVersion(name, namespace, version)
		}
	})
}

// waitForErrors waits on existing channels being watched to finish publishing error details(if any)
func (t *errorDetailsTracker) waitForErrors() {
	t.wg.Wait()
}

// markEntryWithVersion will create an entry in the store if and only if
// no entry exists for the particular key OR an entry exists with an older version.
// If the version of an existing entry is higher (not equal) than the version passed in the function
// call, then the writes are skipped and nothing is updated
func (t *errorDetailsTracker) markEntryWithVersion(workspace string, namespace string, version uint64) {
	key := errorTrackerKey{
		name:      workspace,
		namespace: namespace,
	}

	t.mx.Lock()
	defer t.mx.Unlock()

	existingState, exists := t.store[key]

	if exists && existingState.version > version {
		// writes should be skipped here as an older version MUST not
		// overwrite the entries corresponding to a newer version
		return
	}

	t.store[key] = operationState{
		version:      version,
		errorDetails: nil,
	}
}

func (t *errorDetailsTracker) createSnapshot() map[errorTrackerKey]operationState {
	snapshot := make(map[errorTrackerKey]operationState)

	t.mx.Lock()
	defer t.mx.Unlock()

	for key, state := range t.store {
		snapshot[key] = state
	}

	return snapshot
}