File: manager.go

package info (click to toggle)
docker.io 26.1.5%2Bdfsg1-9
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 68,576 kB
  • sloc: sh: 5,748; makefile: 912; ansic: 664; asm: 228; python: 162
file content (490 lines) | stat: -rw-r--r-- 15,592 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
package csi

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"

	"github.com/docker/go-events"

	"github.com/moby/swarmkit/v2/api"
	"github.com/moby/swarmkit/v2/log"
	"github.com/moby/swarmkit/v2/manager/state/store"
	mobyplugin "github.com/moby/swarmkit/v2/node/plugin"
	"github.com/moby/swarmkit/v2/volumequeue"
)

const (
	// DockerCSIPluginCap is the capability name of the plugins we use with the
	// PluginGetter to get only the plugins we need. The full name of the
	// plugin interface is "docker.csicontroller/1.0". This gets only the CSI
	// plugins with Controller capability.
	DockerCSIPluginCap = "csicontroller"

	// CSIRPCTimeout is the client-side timeout duration for RPCs to the CSI
	// plugin.
	CSIRPCTimeout = 15 * time.Second
)

type Manager struct {
	store *store.MemoryStore
	// provider is the SecretProvider which allows retrieving secrets. Used
	// when creating new Plugin objects.
	provider SecretProvider

	// pg is the plugingetter, which allows us to access the Docker Engine's
	// plugin store.
	pg mobyplugin.Getter

	// newPlugin is a function which returns an object implementing the Plugin
	// interface. It allows us to swap out the implementation of plugins while
	// unit-testing the Manager
	newPlugin func(p mobyplugin.AddrPlugin, provider SecretProvider) Plugin

	// synchronization for starting and stopping the Manager
	startOnce sync.Once

	stopChan chan struct{}
	stopOnce sync.Once
	doneChan chan struct{}

	plugins map[string]Plugin

	pendingVolumes *volumequeue.VolumeQueue
}

func NewManager(s *store.MemoryStore, pg mobyplugin.Getter) *Manager {
	return &Manager{
		store:          s,
		stopChan:       make(chan struct{}),
		doneChan:       make(chan struct{}),
		newPlugin:      NewPlugin,
		pg:             pg,
		plugins:        map[string]Plugin{},
		provider:       NewSecretProvider(s),
		pendingVolumes: volumequeue.NewVolumeQueue(),
	}
}

// Run runs the manager. The provided context is used as the parent for all RPC
// calls made to the CSI plugins. Canceling this context will cancel those RPC
// calls by the nature of contexts, but this is not the preferred way to stop
// the Manager. Instead, Stop should be called, which cause all RPC calls to be
// canceled anyway. The context is also used to get the logging context for the
// Manager.
func (vm *Manager) Run(ctx context.Context) {
	vm.startOnce.Do(func() {
		vm.run(ctx)
	})
}

// run performs the actual meat of the run operation.
//
// the argument is called pctx because it's the parent context, from which we
// immediately resolve a new child context.
func (vm *Manager) run(pctx context.Context) {
	defer close(vm.doneChan)
	ctx, ctxCancel := context.WithCancel(
		log.WithModule(pctx, "csi/manager"),
	)
	defer ctxCancel()

	watch, cancel, err := store.ViewAndWatch(vm.store, func(tx store.ReadTx) error {
		// TODO(dperny): change this from ViewAndWatch to one that's just
		// Watch.
		return nil
	})

	if err != nil {
		log.G(ctx).WithError(err).Error("error in store view and watch")
		return
	}
	defer cancel()

	vm.init(ctx)

	// run a goroutine which periodically processes incoming volumes. the
	// handle function will trigger processing every time new events come in
	// by writing to the channel

	doneProc := make(chan struct{})
	go func() {
		for {
			id, attempt := vm.pendingVolumes.Wait()
			// this case occurs when the stop method has been called on
			// pendingVolumes. stop is called on pendingVolumes when Stop is
			// called on the CSI manager.
			if id == "" && attempt == 0 {
				break
			}
			// TODO(dperny): we can launch some number of workers and process
			// more than one volume at a time, if desired.
			vm.processVolume(ctx, id, attempt)
		}

		// closing doneProc signals that this routine has exited, and allows
		// the main Run routine to exit.
		close(doneProc)
	}()

	// defer read from doneProc. doneProc is closed in the goroutine above,
	// and this defer will block until then. Because defers are executed as a
	// stack, this in turn blocks the final defer (closing doneChan) from
	// running. Ultimately, this prevents Stop from returning until the above
	// goroutine is closed.
	defer func() {
		<-doneProc
	}()

	for {
		select {
		case ev := <-watch:
			vm.handleEvent(ev)
		case <-vm.stopChan:
			vm.pendingVolumes.Stop()
			return
		}
	}
}

// processVolumes encapuslates the logic for processing pending Volumes.
func (vm *Manager) processVolume(ctx context.Context, id string, attempt uint) {
	// set up log fields for a derrived context to pass to handleVolume.
	logCtx := log.WithFields(ctx, log.Fields{
		"volume.id": id,
		"attempt":   attempt,
	})

	// Set a client-side timeout. Without this, one really long server-side
	// timeout can block processing all volumes until it completes or fails.
	dctx, cancel := context.WithTimeout(logCtx, CSIRPCTimeout)
	// always gotta call the WithTimeout cancel
	defer cancel()

	err := vm.handleVolume(dctx, id)
	// TODO(dperny): differentiate between retryable and non-retryable
	// errors.
	if err != nil {
		log.G(dctx).WithError(err).Info("error handling volume")
		vm.pendingVolumes.Enqueue(id, attempt+1)
	}
}

// init does one-time setup work for the Manager, like creating all of
// the Plugins and initializing the local state of the component.
func (vm *Manager) init(ctx context.Context) {
	var (
		nodes   []*api.Node
		volumes []*api.Volume
	)
	vm.store.View(func(tx store.ReadTx) {
		var err error
		nodes, err = store.FindNodes(tx, store.All)
		if err != nil {
			// this should *never happen*. Find only returns errors if the find
			// by is invalid.
			log.G(ctx).WithError(err).Error("error finding nodes")
		}
		volumes, err = store.FindVolumes(tx, store.All)
		if err != nil {
			// likewise, should never happen.
			log.G(ctx).WithError(err).Error("error finding volumes")
		}
	})

	for _, node := range nodes {
		vm.handleNode(node)
	}

	// on initialization, we enqueue all of the Volumes. The easiest way to
	// know if a Volume needs some work performed is to just pass it through
	// the VolumeManager. If it doesn't need any work, then we will quickly
	// skip by it. Otherwise, the needed work will be performed.
	for _, volume := range volumes {
		vm.enqueueVolume(volume.ID)
	}
}

func (vm *Manager) Stop() {
	vm.stopOnce.Do(func() {
		close(vm.stopChan)
	})

	<-vm.doneChan
}

func (vm *Manager) handleEvent(ev events.Event) {
	switch e := ev.(type) {
	case api.EventCreateVolume:
		vm.enqueueVolume(e.Volume.ID)
	case api.EventUpdateVolume:
		vm.enqueueVolume(e.Volume.ID)
	case api.EventCreateNode:
		vm.handleNode(e.Node)
	case api.EventUpdateNode:
		// for updates, we're only adding the node to every plugin. if the node
		// no longer reports CSIInfo for a specific plugin, we will just leave
		// the stale data in the plugin. this should not have any adverse
		// effect, because the memory impact is small, and this operation
		// should not be frequent. this may change as the code for volumes
		// becomes more polished.
		vm.handleNode(e.Node)
	case api.EventDeleteNode:
		vm.handleNodeRemove(e.Node.ID)
	}
}

func (vm *Manager) createVolume(ctx context.Context, v *api.Volume) error {
	l := log.G(ctx).WithField("volume.id", v.ID).WithField("driver", v.Spec.Driver.Name)
	l.Info("creating volume")

	p, err := vm.getPlugin(v.Spec.Driver.Name)
	if err != nil {
		l.Errorf("volume creation failed: %s", err.Error())
		return err
	}

	info, err := p.CreateVolume(ctx, v)
	if err != nil {
		l.WithError(err).Error("volume create failed")
		return err
	}

	err = vm.store.Update(func(tx store.Tx) error {
		v2 := store.GetVolume(tx, v.ID)
		// the volume should never be missing. I don't know of even any race
		// condition that could result in this behavior. nevertheless, it's
		// better to do this than to segfault.
		if v2 == nil {
			return nil
		}

		v2.VolumeInfo = info

		return store.UpdateVolume(tx, v2)
	})
	if err != nil {
		l.WithError(err).Error("committing created volume to store failed")
	}
	return err
}

// enqueueVolume enqueues a new volume event, placing the Volume ID into
// pendingVolumes to be processed. Because enqueueVolume is only called in
// response to a new Volume update event, not for a retry, the retry number is
// always reset to 0.
func (vm *Manager) enqueueVolume(id string) {
	vm.pendingVolumes.Enqueue(id, 0)
}

// handleVolume processes a Volume. It determines if any relevant update has
// occurred, and does the required work to handle that update if so.
//
// returns an error if handling the volume failed and needs to be retried.
//
// even if an error is returned, the store may still be updated.
func (vm *Manager) handleVolume(ctx context.Context, id string) error {
	var volume *api.Volume
	vm.store.View(func(tx store.ReadTx) {
		volume = store.GetVolume(tx, id)
	})
	if volume == nil {
		// if the volume no longer exists, there is nothing to do, nothing to
		// retry, and no relevant error.
		return nil
	}

	if volume.VolumeInfo == nil {
		return vm.createVolume(ctx, volume)
	}

	if volume.PendingDelete {
		return vm.deleteVolume(ctx, volume)
	}

	updated := false
	// TODO(dperny): it's just pointers, but copying the entire PublishStatus
	// on each update might be intensive.

	// we take a copy of the PublishStatus slice, because if we succeed in an
	// unpublish operation, we will delete that status from PublishStatus.
	statuses := make([]*api.VolumePublishStatus, len(volume.PublishStatus))
	copy(statuses, volume.PublishStatus)

	// failedPublishOrUnpublish is a slice of nodes where publish or unpublish
	// operations failed. Publishing or unpublishing a volume can succeed or
	// fail in part. If any failures occur, we will add the node ID of the
	// publish operation that failed to this slice. Then, at the end of this
	// function, after we update the store, if there are any failed operations,
	// we will still return an error.
	failedPublishOrUnpublish := []string{}

	// adjustIndex is the number of entries deleted from volume.PublishStatus.
	// when we're deleting entries from volume.PublishStatus, the index of the
	// entry in statuses will no longer match the index of the same entry in
	// volume.PublishStatus. we subtract adjustIndex from i to get the index
	// where the entry is found after taking into account the deleted entries.
	adjustIndex := 0

	for i, status := range statuses {
		switch status.State {
		case api.VolumePublishStatus_PENDING_PUBLISH:
			plug, err := vm.getPlugin(volume.Spec.Driver.Name)
			if err != nil {
				status.Message = fmt.Sprintf("error publishing volume: %v", err)
				failedPublishOrUnpublish = append(failedPublishOrUnpublish, status.NodeID)
			} else {
				publishContext, err := plug.PublishVolume(ctx, volume, status.NodeID)
				if err == nil {
					status.State = api.VolumePublishStatus_PUBLISHED
					status.PublishContext = publishContext
					status.Message = ""
				} else {
					status.Message = fmt.Sprintf("error publishing volume: %v", err)
					failedPublishOrUnpublish = append(failedPublishOrUnpublish, status.NodeID)
				}
			}
			updated = true
		case api.VolumePublishStatus_PENDING_UNPUBLISH:
			plug, err := vm.getPlugin(volume.Spec.Driver.Name)
			if err != nil {
				status.Message = fmt.Sprintf("error unpublishing volume: %v", err)
				failedPublishOrUnpublish = append(failedPublishOrUnpublish, status.NodeID)
			} else {
				err := plug.UnpublishVolume(ctx, volume, status.NodeID)
				if err == nil {
					// if there is no error with unpublishing, then we delete the
					// status from the statuses slice.
					j := i - adjustIndex
					volume.PublishStatus = append(volume.PublishStatus[:j], volume.PublishStatus[j+1:]...)
					adjustIndex++
				} else {
					status.Message = fmt.Sprintf("error unpublishing volume: %v", err)
					failedPublishOrUnpublish = append(failedPublishOrUnpublish, status.NodeID)
				}
			}

			updated = true
		}
	}

	if updated {
		if err := vm.store.Update(func(tx store.Tx) error {
			// the publish status is now authoritative. read-update-write the
			// volume object.
			v := store.GetVolume(tx, volume.ID)
			if v == nil {
				// volume should never be deleted with pending publishes. if
				// this does occur somehow, then we will just ignore it, rather
				// than crashing.
				return nil
			}

			v.PublishStatus = volume.PublishStatus
			return store.UpdateVolume(tx, v)
		}); err != nil {
			return err
		}
	}

	if len(failedPublishOrUnpublish) > 0 {
		return fmt.Errorf("error publishing or unpublishing to some nodes: %v", failedPublishOrUnpublish)
	}
	return nil
}

// handleNode handles one node event
func (vm *Manager) handleNode(n *api.Node) {
	if n.Description == nil {
		return
	}
	// we just call AddNode on every update. Because it's just a map
	// assignment, this is probably faster than checking if something changed.
	for _, info := range n.Description.CSIInfo {
		p, err := vm.getPlugin(info.PluginName)
		if err != nil {
			log.L.Warnf("error handling node: %v", err)
			// TODO(dperny): log something
			continue
		}
		p.AddNode(n.ID, info.NodeID)
	}
}

// handleNodeRemove handles a node delete event
func (vm *Manager) handleNodeRemove(nodeID string) {
	// we just call RemoveNode on every plugin, because it's probably quicker
	// than checking if the node was using that plugin.
	//
	// we don't need to worry about lazy-loading here, because if don't have
	// the plugin loaded, there's no need to call remove.
	for _, plugin := range vm.plugins {
		plugin.RemoveNode(nodeID)
	}
}

func (vm *Manager) deleteVolume(ctx context.Context, v *api.Volume) error {
	// TODO(dperny): handle missing plugin
	plug, err := vm.getPlugin(v.Spec.Driver.Name)
	if err != nil {
		return err
	}
	err = plug.DeleteVolume(ctx, v)
	if err != nil {
		return err
	}

	// TODO(dperny): handle update error
	return vm.store.Update(func(tx store.Tx) error {
		return store.DeleteVolume(tx, v.ID)
	})
}

// getPlugin returns the plugin with the given name.
//
// In a previous iteration of the architecture of this component, plugins were
// added to the manager through an update to the Cluster object, which
// triggered an event. In other words, they were eagerly loaded.
//
// When rearchitecting to use the plugingetter.PluginGetter interface, that
// eager loading is no longer practical, because the method for getting events
// about new plugins would be difficult to plumb this deep into swarm.
//
// Instead, we change from what was previously a bunch of raw map lookups to
// instead a method call which lazy-loads the plugins as needed. This is fine,
// because in the Plugin object itself, the network connection is made lazily
// as well.
//
// TODO(dperny): There is no way to unload a plugin. Unloading plugins will
// happen as part of a leadership change, but otherwise, on especially
// long-lived managers with especially high plugin churn, this is a memory
// leak. It's acceptable for now because we expect neither exceptionally long
// lived managers nor exceptionally high plugin churn.
func (vm *Manager) getPlugin(name string) (Plugin, error) {
	// if the plugin already exists, we can just return it.
	if p, ok := vm.plugins[name]; ok {
		return p, nil
	}

	// otherwise, we need to load the plugin.
	pc, err := vm.pg.Get(name, DockerCSIPluginCap)
	if err != nil {
		return nil, err
	}

	if pc == nil {
		return nil, errors.New("driver \"" + name + "\" not found")
	}

	pa, ok := pc.(mobyplugin.AddrPlugin)
	if !ok {
		return nil, errors.New("plugin for driver \"" + name + "\" does not implement PluginAddr")
	}

	p := vm.newPlugin(pa, vm.provider)
	vm.plugins[name] = p

	return p, nil
}