File: volumes.go

package info (click to toggle)
docker.io 26.1.5%2Bdfsg1-9
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 68,576 kB
  • sloc: sh: 5,748; makefile: 912; ansic: 664; asm: 228; python: 162
file content (327 lines) | stat: -rw-r--r-- 10,383 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
package scheduler

import (
	"fmt"
	"strings"

	"github.com/moby/swarmkit/v2/api"
	"github.com/moby/swarmkit/v2/manager/state/store"
)

// the scheduler package does double duty -- in addition to choosing nodes, it
// must also choose volumes. this is because volumes are fungible, and can be
// scheduled to several nodes, and used by several tasks. we should endeavor to
// spread tasks across volumes, like we spread nodes. on the positive side,
// unlike nodes, volumes are not heirarchical. that is, we don't need to
// spread across multiple levels of a tree, only a flat set.

// volumeSet is the set of all volumes currently managed
type volumeSet struct {
	// volumes is a mapping of volume IDs to volumeInfo
	volumes map[string]volumeInfo
	// byGroup is a mapping from a volume group name to a set of volumes in
	// that group
	byGroup map[string]map[string]struct{}
	// byName is a mapping of volume names to swarmkit volume IDs.
	byName map[string]string
}

// volumeUsage contains information about the usage of a Volume by a specific
// task.
type volumeUsage struct {
	nodeID   string
	readOnly bool
}

// volumeInfo contains scheduler information about a given volume
type volumeInfo struct {
	volume *api.Volume
	tasks  map[string]volumeUsage
	// nodes is a set of nodes a volume is in use on. it maps a node ID to a
	// reference count for how many tasks are using the volume on that node.
	nodes map[string]int
}

func newVolumeSet() *volumeSet {
	return &volumeSet{
		volumes: map[string]volumeInfo{},
		byGroup: map[string]map[string]struct{}{},
		byName:  map[string]string{},
	}
}

// getVolume returns the volume object for the given ID as stored in the
// volumeSet, or nil if none exists.
//
//nolint:unused // TODO(thaJeztah) this is currently unused: is it safe to remove?
func (vs *volumeSet) getVolume(id string) *api.Volume {
	return vs.volumes[id].volume
}

func (vs *volumeSet) addOrUpdateVolume(v *api.Volume) {
	if info, ok := vs.volumes[v.ID]; !ok {
		vs.volumes[v.ID] = volumeInfo{
			volume: v,
			nodes:  map[string]int{},
			tasks:  map[string]volumeUsage{},
		}
	} else {
		// if the volume already exists in the set, then only update the volume
		// object, not the tasks map.
		info.volume = v
	}

	if set, ok := vs.byGroup[v.Spec.Group]; ok {
		set[v.ID] = struct{}{}
	} else {
		vs.byGroup[v.Spec.Group] = map[string]struct{}{v.ID: {}}
	}
	vs.byName[v.Spec.Annotations.Name] = v.ID
}

//nolint:unused // only used in tests.
func (vs *volumeSet) removeVolume(volumeID string) {
	if info, ok := vs.volumes[volumeID]; ok {
		// if the volume exists in the set, look up its group ID and remove it
		// from the byGroup mapping as well
		group := info.volume.Spec.Group
		delete(vs.byGroup[group], volumeID)
		delete(vs.volumes, volumeID)
		delete(vs.byName, info.volume.Spec.Annotations.Name)
	}
}

// chooseTaskVolumes selects a set of VolumeAttachments for the task on the
// given node. it expects that the node was already validated to have the
// necessary volumes, but it will return an error if a full set of volumes is
// not available.
func (vs *volumeSet) chooseTaskVolumes(task *api.Task, nodeInfo *NodeInfo) ([]*api.VolumeAttachment, error) {
	volumes := []*api.VolumeAttachment{}

	// we'll reserve volumes in this loop, but release all of our reservations
	// before we finish. the caller will need to call reserveTaskVolumes after
	// calling this function
	// TODO(dperny): this is probably not optimal
	defer func() {
		for _, volume := range volumes {
			vs.releaseVolume(volume.ID, task.ID)
		}
	}()

	// TODO(dperny): handle non-container tasks
	c := task.Spec.GetContainer()
	if c == nil {
		return nil, nil
	}
	for _, mount := range task.Spec.GetContainer().Mounts {
		if mount.Type == api.MountTypeCluster {
			candidate := vs.isVolumeAvailableOnNode(&mount, nodeInfo)
			if candidate == "" {
				// TODO(dperny): return structured error types, instead of
				// error strings
				return nil, fmt.Errorf("cannot find volume to satisfy mount with source %v", mount.Source)
			}
			vs.reserveVolume(candidate, task.ID, nodeInfo.Node.ID, mount.ReadOnly)
			volumes = append(volumes, &api.VolumeAttachment{
				ID:     candidate,
				Source: mount.Source,
				Target: mount.Target,
			})
		}
	}

	return volumes, nil
}

// reserveTaskVolumes identifies all volumes currently in use on a task and
// marks them in the volumeSet as in use.
func (vs *volumeSet) reserveTaskVolumes(task *api.Task) {
	for _, va := range task.Volumes {
		// we shouldn't need to handle non-container tasks because those tasks
		// won't have any entries in task.Volumes.
		for _, mount := range task.Spec.GetContainer().Mounts {
			if mount.Source == va.Source && mount.Target == va.Target {
				vs.reserveVolume(va.ID, task.ID, task.NodeID, mount.ReadOnly)
			}
		}
	}
}

func (vs *volumeSet) reserveVolume(volumeID, taskID, nodeID string, readOnly bool) {
	info, ok := vs.volumes[volumeID]
	if !ok {
		// TODO(dperny): don't just return nothing.
		return
	}

	info.tasks[taskID] = volumeUsage{nodeID: nodeID, readOnly: readOnly}
	// increment the reference count for this node.
	info.nodes[nodeID] = info.nodes[nodeID] + 1
}

func (vs *volumeSet) releaseVolume(volumeID, taskID string) {
	info, ok := vs.volumes[volumeID]
	if !ok {
		// if the volume isn't in the set, no action to take.
		return
	}

	// decrement the reference count for this task's node
	usage, ok := info.tasks[taskID]
	if ok {
		// this is probably an unnecessarily high level of caution, but make
		// sure we don't go below zero on node count.
		if c := info.nodes[usage.nodeID]; c > 0 {
			info.nodes[usage.nodeID] = c - 1
		}
		delete(info.tasks, taskID)
	}
}

// freeVolumes finds volumes that are no longer in use on some nodes, and
// updates them to be unpublished from those nodes.
//
// TODO(dperny): this is messy and has a lot of overhead. it should be reworked
// to something more streamlined.
func (vs *volumeSet) freeVolumes(batch *store.Batch) error {
	for volumeID, info := range vs.volumes {
		if err := batch.Update(func(tx store.Tx) error {
			v := store.GetVolume(tx, volumeID)
			if v == nil {
				return nil
			}

			// when we are freeing a volume, we may update more than one of the
			// volume's PublishStatuses. this means we can't simply put the
			// Update call inside of the if statement; we need to know if we've
			// changed anything once we've checked *all* of the statuses.
			changed := false
			for _, status := range v.PublishStatus {
				if info.nodes[status.NodeID] == 0 && status.State == api.VolumePublishStatus_PUBLISHED {
					status.State = api.VolumePublishStatus_PENDING_NODE_UNPUBLISH
					changed = true
				}
			}
			if changed {
				if err := store.UpdateVolume(tx, v); err != nil {
					return err
				}
			}
			return nil
		}); err != nil {
			return err
		}
	}
	return nil
}

// isVolumeAvailableOnNode checks if a volume satisfying the given mount is
// available on the given node.
//
// Returns the ID of the volume available, or an empty string if no such volume
// is found.
func (vs *volumeSet) isVolumeAvailableOnNode(mount *api.Mount, node *NodeInfo) string {
	source := mount.Source
	// first, discern whether we're looking for a group or a volume
	// try trimming off the "group:" prefix. if the resulting string is
	// different from the input string (meaning something has been trimmed),
	// then this volume is actually a volume group.
	if group := strings.TrimPrefix(source, "group:"); group != source {
		ids, ok := vs.byGroup[group]
		// if there are no volumes of this group specified, then no volume
		// meets the moutn criteria.
		if !ok {
			return ""
		}

		// iterate through all ids in the group, checking if any one meets the
		// spec.
		for id := range ids {
			if vs.checkVolume(id, node, mount.ReadOnly) {
				return id
			}
		}
		return ""
	}

	// if it's not a group, it's a name. resolve the volume name to its ID
	id, ok := vs.byName[source]
	if !ok || !vs.checkVolume(id, node, mount.ReadOnly) {
		return ""
	}
	return id
}

// checkVolume checks if an individual volume with the given ID can be placed
// on the given node.
func (vs *volumeSet) checkVolume(id string, info *NodeInfo, readOnly bool) bool {
	vi := vs.volumes[id]
	// first, check if the volume's availability is even Active. If not. no
	// reason to bother with anything further.
	if vi.volume != nil && vi.volume.Spec.Availability != api.VolumeAvailabilityActive {
		return false
	}

	// get the node topology for this volume
	var top *api.Topology
	// get the topology for this volume's driver on this node
	for _, info := range info.Description.CSIInfo {
		if info.PluginName == vi.volume.Spec.Driver.Name {
			top = info.AccessibleTopology
			break
		}
	}

	// check if the volume is available on this node. a volume's
	// availability on a node depends on its accessible topology, how it's
	// already being used, and how this task intends to use it.

	if vi.volume.Spec.AccessMode.Scope == api.VolumeScopeSingleNode {
		// if the volume is not in use on this node already, then it can't
		// be used here.
		for _, usage := range vi.tasks {
			if usage.nodeID != info.ID {
				return false
			}
		}
	}

	// even if the volume is currently on this node, or it has multi-node
	// access, the volume sharing needs to be compatible.
	switch vi.volume.Spec.AccessMode.Sharing {
	case api.VolumeSharingNone:
		// if the volume sharing is none, then the volume cannot be
		// used by another task
		if len(vi.tasks) > 0 {
			return false
		}
	case api.VolumeSharingOneWriter:
		// if the mount is not ReadOnly, and the volume has a writer, then
		// we this volume does not work.
		if !readOnly && hasWriter(vi) {
			return false
		}
	case api.VolumeSharingReadOnly:
		// if the volume sharing is read-only, then the Mount must also
		// be read-only
		if !readOnly {
			return false
		}
	}

	// then, do the quick check of whether this volume is in the topology.  if
	// the volume has an AccessibleTopology, and it does not lie within the
	// node's topology, then this volume won't fit.
	return IsInTopology(top, vi.volume.VolumeInfo.AccessibleTopology)
}

// hasWriter is a helper function that returns true if at least one task is
// using this volume not in read-only mode.
func hasWriter(info volumeInfo) bool {
	for _, usage := range info.tasks {
		if !usage.readOnly {
			return true
		}
	}
	return false
}