File: lostresourceuser.go

package info (click to toggle)
golang-github-linbit-golinstor 0.55.0-2
  • links: PTS, VCS
  • area: main
  • in suites: experimental, forky, sid
  • size: 480 kB
  • sloc: makefile: 11
file content (223 lines) | stat: -rw-r--r-- 5,593 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package monitor

import (
	"context"
	"sync"
	"time"

	"github.com/LINBIT/golinstor/client"
	"github.com/LINBIT/golinstor/devicelayerkind"
)

type resourceState struct {
	hasQuorum bool
	isWatched bool
}

type haResources struct {
	resources map[string]resourceState
	sync.Mutex
}

// LostResourceUser is a struct that exposes the "may promote" state of a DRBD resource
// If a resource may be promoted (i.e., may be switched to Primary) after some grace period, this usually means that its user (that had the resource promoted) failed. It could also happen that the user just terminated/gets rescheduled,... It is up to the user of this API to decide.
// This also means that the user (e.g., some k8s pod) needs to be restarted/rescheduled.
// The LostResourceUser is generic, it sends the names of resources that lost their user on the channel C.
type LostResourceUser struct {
	ctx              context.Context
	cancel           context.CancelFunc
	client           *client.Client
	mayPromoteStream *client.DRBDMayPromoteStream
	haResources      haResources
	initialDelay     time.Duration
	existingDelay    time.Duration

	C chan string // DRBD resource names of resources that may be promoted.
}

const (
	INITIAL_DELAY_DEFAULT  = 1 * time.Minute
	EXISTING_DELAY_DEFAULT = 45 * time.Second
)

// Option represents a configuration option of the LostResourceUser
type Option func(*LostResourceUser) error

// WithDelay sets the "initial delay" (for not yet seen resources) and the "existing delay" (for already known resources).
func WithDelay(initial, existing time.Duration) Option {
	return func(ha *LostResourceUser) error {
		ha.initialDelay = initial
		ha.existingDelay = existing
		return nil
	}
}

// NewLostResourceUser creates a new LostResourceUser. It takes a context, a Go LINSTOR client, and options as its input.
func NewLostResourceUser(ctx context.Context, client *client.Client, options ...Option) (*LostResourceUser, error) {
	// TODO: we only add to the map, we should have a GC that iterates over all the non-watched(?) and rms them.
	mayPromoteStream, err := client.Events.DRBDPromotion(ctx, "current")
	if err != nil {
		return nil, err
	}

	ctx, cancel := context.WithCancel(ctx)

	lr := &LostResourceUser{
		ctx:              ctx,
		cancel:           cancel,
		client:           client,
		mayPromoteStream: mayPromoteStream,
		// haResources:      haResources,
		haResources: haResources{
			resources: make(map[string]resourceState),
		},
		initialDelay:  INITIAL_DELAY_DEFAULT,
		existingDelay: EXISTING_DELAY_DEFAULT,
		C:             make(chan string),
	}

	for _, opt := range options {
		if err := opt(lr); err != nil {
			return nil, err
		}
	}

	go func() {
		for {
			select {
			case ev, ok := <-lr.mayPromoteStream.Events:
				if !ok {
					lr.Stop()
					close(lr.C)
					return
				}
				if !ev.MayPromote {
					continue
				}

				resName := ev.ResourceName

				watch, dur := lr.resShouldWatch(resName)
				if !watch {
					continue
				}
				go lr.watch(resName, dur)
			case <-lr.ctx.Done():
				lr.mayPromoteStream.Close()
				// would now receive in the event case, so
				close(lr.C)
				return
			}
		}
	}()

	return lr, nil
}

// Stop terminates all helper Go routines and closes the connection to the events stream.
func (rl *LostResourceUser) Stop() {
	rl.cancel()
}

func (lr *LostResourceUser) watch(resName string, dur time.Duration) {
	ticker := time.NewTicker(dur)
	defer ticker.Stop()

	select {
	case <-ticker.C:
		break
	case <-lr.ctx.Done():
		return
	}

	// reevaluate the current state
	ress, err := lr.client.Resources.GetAll(lr.ctx, resName)
	// here we might delete it, or reset isWatched
	lr.haResources.Lock()
	defer lr.haResources.Unlock()

	if err == client.NotFoundError {
		// looks like it got deleted. but anyways, nothing we can do, rm it from our dict
		delete(lr.haResources.resources, resName)
		return
	} else if err != nil {
		lr.Stop()
		return
	}

	oneMayPromote := false
	for _, r := range ress {
		if r.LayerObject.Type != devicelayerkind.Drbd {
			delete(lr.haResources.resources, resName)
			return
		}
		if r.LayerObject.Drbd.MayPromote {
			oneMayPromote = true
			break
		}
	}

	if oneMayPromote {
		lr.C <- resName
	}

	res := lr.haResources.resources[resName]
	// if we introduce a GC we need to check for ok here ^^
	// but currently all the deletes are here under this lock
	res.isWatched = false
	lr.haResources.resources[resName] = res
}

func (ha *LostResourceUser) resHasQuorum(resName string) (bool, error) {
	rd, err := ha.client.ResourceDefinitions.Get(ha.ctx, resName)
	if err != nil {
		return false, err
	}

	val, ok := rd.Props["DrbdOptions/Resource/quorum"]
	if !ok || val == "off" {
		return false, nil
	}

	return true, nil
}

func (ha *LostResourceUser) resShouldWatch(resName string) (bool, time.Duration) {
	long, short := ha.initialDelay, ha.existingDelay

	ha.haResources.Lock()
	defer ha.haResources.Unlock()

	res, ok := ha.haResources.resources[resName]

	if ok { // existing resource
		if res.isWatched {
			return false, 0
		}

		if !res.hasQuorum {
			return false, 0
		}

		res.isWatched = true
		ha.haResources.resources[resName] = res
		return true, short
	}

	// new resource
	hasQuorum, err := ha.resHasQuorum(resName)
	if err != nil {
		// hope for better times...
		return false, 0
	}
	// create the map entry
	ha.haResources.resources[resName] = resourceState{
		hasQuorum: hasQuorum,
		isWatched: hasQuorum, // not a typo, if it hasQuorum, we will watch it
	}
	if !hasQuorum {
		return false, 0
	}
	// new one with quorum, give it some time...
	return true, long
}