File: migration_volumes.go

package info (click to toggle)
incus 6.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 24,392 kB
  • sloc: sh: 16,313; ansic: 3,121; python: 457; makefile: 337; ruby: 51; sql: 50; lisp: 6
file content (322 lines) | stat: -rw-r--r-- 11,023 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
package migration

import (
	"fmt"
	"io"
	"net/http"
	"slices"

	"github.com/lxc/incus/v6/internal/migration"
	backupConfig "github.com/lxc/incus/v6/internal/server/backup/config"
	"github.com/lxc/incus/v6/internal/server/operations"
	"github.com/lxc/incus/v6/shared/api"
	"github.com/lxc/incus/v6/shared/ioprogress"
	"github.com/lxc/incus/v6/shared/units"
)

// Info represents the index frame sent if supported.
type Info struct {
	Config *backupConfig.Config `json:"config,omitempty" yaml:"config,omitempty"` // Equivalent of backup.yaml but embedded in index.
}

// InfoResponse represents the response to the index frame sent if supported.
// Right now this doesn't contain anything useful, its just used to indicate receipt of the index header.
// But in the future the intention is to use it allow the target to send back additional information to the source
// about which frames (such as snapshots) it needs for the migration after having inspected the Info index header.
type InfoResponse struct {
	StatusCode int
	Error      string
	Refresh    *bool // This is used to let the source know whether to actually refresh a volume.
}

// Err returns the error of the response.
func (r *InfoResponse) Err() error {
	if r.StatusCode != http.StatusOK {
		return api.StatusErrorf(r.StatusCode, "%s", r.Error)
	}

	return nil
}

// Type represents the migration transport type. It indicates the method by which the migration can
// take place and what optional features are available.
type Type struct {
	FSType   migration.MigrationFSType // Transport mode selected.
	Features []string                  // Feature hints for selected FSType transport mode.
}

// VolumeSourceArgs represents the arguments needed to setup a volume migration source.
type VolumeSourceArgs struct {
	IndexHeaderVersion uint32
	Name               string
	Snapshots          []string
	MigrationType      Type
	TrackProgress      bool
	MultiSync          bool
	FinalSync          bool
	Data               any // Optional store to persist storage driver state between MultiSync phases.
	ContentType        string
	AllowInconsistent  bool
	Refresh            bool
	Info               *Info
	VolumeOnly         bool
	ClusterMove        bool
	StorageMove        bool
}

// VolumeTargetArgs represents the arguments needed to setup a volume migration sink.
type VolumeTargetArgs struct {
	IndexHeaderVersion    uint32
	Name                  string
	Description           string
	Config                map[string]string // Only used for custom volume migration.
	Snapshots             []*migration.Snapshot
	MigrationType         Type
	TrackProgress         bool
	Refresh               bool
	RefreshExcludeOlder   bool
	Live                  bool
	VolumeSize            int64
	ContentType           string
	VolumeOnly            bool
	ClusterMoveSourceName string
	StoragePool           string
}

// TypesToHeader converts one or more Types to a MigrationHeader. It uses the first type argument
// supplied to indicate the preferred migration method and sets the MigrationHeader's Fs type
// to that. If the preferred type is ZFS then it will also set the header's optional ZfsFeatures.
// If the fallback Rsync type is present in any of the types even if it is not preferred, then its
// optional features are added to the header's RsyncFeatures, allowing for fallback negotiation to
// take place on the farside.
func TypesToHeader(types ...Type) *migration.MigrationHeader {
	missingFeature := false
	hasFeature := true
	var preferredType Type

	if len(types) > 0 {
		preferredType = types[0]
	}

	header := migration.MigrationHeader{Fs: &preferredType.FSType}

	// Add ZFS features if preferred type is ZFS.
	if preferredType.FSType == migration.MigrationFSType_ZFS {
		features := migration.ZfsFeatures{
			Compress: &missingFeature,
		}

		for _, feature := range preferredType.Features {
			if feature == "compress" {
				features.Compress = &hasFeature
			} else if feature == migration.ZFSFeatureMigrationHeader {
				features.MigrationHeader = &hasFeature
			} else if feature == migration.ZFSFeatureZvolFilesystems {
				features.HeaderZvols = &hasFeature
			}
		}

		header.ZfsFeatures = &features
	}

	// Add BTRFS features if preferred type is BTRFS.
	if preferredType.FSType == migration.MigrationFSType_BTRFS {
		features := migration.BtrfsFeatures{
			MigrationHeader:  &missingFeature,
			HeaderSubvolumes: &missingFeature,
		}

		for _, feature := range preferredType.Features {
			if feature == migration.BTRFSFeatureMigrationHeader {
				features.MigrationHeader = &hasFeature
			} else if feature == migration.BTRFSFeatureSubvolumes {
				features.HeaderSubvolumes = &hasFeature
			} else if feature == migration.BTRFSFeatureSubvolumeUUIDs {
				features.HeaderSubvolumeUuids = &hasFeature
			}
		}

		header.BtrfsFeatures = &features
	}

	// Check all the types for an Rsync method, if found add its features to the header's RsyncFeatures list.
	for _, t := range types {
		if t.FSType != migration.MigrationFSType_RSYNC && t.FSType != migration.MigrationFSType_BLOCK_AND_RSYNC {
			continue
		}

		features := migration.RsyncFeatures{
			Xattrs:        &missingFeature,
			Delete:        &missingFeature,
			Compress:      &missingFeature,
			Bidirectional: &missingFeature,
		}

		for _, feature := range t.Features {
			if feature == "xattrs" {
				features.Xattrs = &hasFeature
			} else if feature == "delete" {
				features.Delete = &hasFeature
			} else if feature == "compress" {
				features.Compress = &hasFeature
			} else if feature == "bidirectional" {
				features.Bidirectional = &hasFeature
			}
		}

		header.RsyncFeatures = &features
		break // Only use the first rsync transport type found to generate rsync features list.
	}

	return &header
}

// MatchTypes attempts to find matching migration transport types between an offered type sent from a remote
// source and the types supported by a local storage pool. If matches are found then one or more Types are
// returned containing the method and the matching optional features present in both. The function also takes a
// fallback type which is used as an additional offer type preference in case the preferred remote type is not
// compatible with the local type available. It is expected that both sides of the migration will support the
// fallback type for the volume's content type that is being migrated.
func MatchTypes(offer *migration.MigrationHeader, fallbackType migration.MigrationFSType, ourTypes []Type) ([]Type, error) {
	// Generate an offer types slice from the preferred type supplied from remote and the
	// fallback type supplied based on the content type of the transfer.
	offeredFSTypes := []migration.MigrationFSType{offer.GetFs(), fallbackType}

	matchedTypes := []Type{}

	// Find first matching type.
	for _, ourType := range ourTypes {
		for _, offerFSType := range offeredFSTypes {
			if offerFSType != ourType.FSType {
				continue // Not a match, try the next one.
			}

			// We got a match, now extract the relevant offered features.
			var offeredFeatures []string
			if offerFSType == migration.MigrationFSType_ZFS {
				offeredFeatures = offer.GetZfsFeaturesSlice()
			} else if offerFSType == migration.MigrationFSType_BTRFS {
				offeredFeatures = offer.GetBtrfsFeaturesSlice()
			} else if offerFSType == migration.MigrationFSType_RSYNC {
				offeredFeatures = offer.GetRsyncFeaturesSlice()
			}

			// Find common features in both our type and offered type.
			commonFeatures := []string{}
			for _, ourFeature := range ourType.Features {
				if slices.Contains(offeredFeatures, ourFeature) {
					commonFeatures = append(commonFeatures, ourFeature)
				}
			}

			if offer.GetRefresh() {
				// Optimized refresh with zfs only works if ZfsFeatureMigrationHeader is available.
				if ourType.FSType == migration.MigrationFSType_ZFS && !slices.Contains(commonFeatures, migration.ZFSFeatureMigrationHeader) {
					continue
				}

				// Optimized refresh with btrfs only works if BtrfsFeatureSubvolumeUUIDs is available.
				if ourType.FSType == migration.MigrationFSType_BTRFS && !slices.Contains(commonFeatures, migration.BTRFSFeatureSubvolumeUUIDs) {
					continue
				}
			}

			// Append type with combined features.
			matchedTypes = append(matchedTypes, Type{
				FSType:   ourType.FSType,
				Features: commonFeatures,
			})
		}
	}

	if len(matchedTypes) < 1 {
		// No matching transport type found, generate an error with offered types and our types.
		offeredTypeStrings := make([]string, 0, len(offeredFSTypes))
		for _, offerFSType := range offeredFSTypes {
			offeredTypeStrings = append(offeredTypeStrings, offerFSType.String())
		}

		ourTypeStrings := make([]string, 0, len(ourTypes))
		for _, ourType := range ourTypes {
			ourTypeStrings = append(ourTypeStrings, ourType.FSType.String())
		}

		return matchedTypes, fmt.Errorf("No matching migration types found. Offered types: %v, our types: %v", offeredTypeStrings, ourTypeStrings)
	}

	return matchedTypes, nil
}

func progressWrapperRender(op *operations.Operation, key string, description string, progressInt int64, speedInt int64) {
	meta := op.Metadata()
	if meta == nil {
		meta = make(map[string]any)
	}

	progress := fmt.Sprintf("%s (%s/s)", units.GetByteSizeString(progressInt, 2), units.GetByteSizeString(speedInt, 2))
	if description != "" {
		progress = fmt.Sprintf("%s: %s (%s/s)", description, units.GetByteSizeString(progressInt, 2), units.GetByteSizeString(speedInt, 2))
	}

	if meta[key] != progress {
		meta[key] = progress
		_ = op.UpdateMetadata(meta)
	}
}

// ProgressReader reports the read progress.
func ProgressReader(op *operations.Operation, key string, description string) func(io.ReadCloser) io.ReadCloser {
	return func(reader io.ReadCloser) io.ReadCloser {
		if op == nil {
			return reader
		}

		progress := func(progressInt int64, speedInt int64) {
			progressWrapperRender(op, key, description, progressInt, speedInt)
		}

		readPipe := &ioprogress.ProgressReader{
			ReadCloser: reader,
			Tracker: &ioprogress.ProgressTracker{
				Handler: progress,
			},
		}

		return readPipe
	}
}

// ProgressWriter reports the write progress.
func ProgressWriter(op *operations.Operation, key string, description string) func(io.WriteCloser) io.WriteCloser {
	return func(writer io.WriteCloser) io.WriteCloser {
		if op == nil {
			return writer
		}

		progress := func(progressInt int64, speedInt int64) {
			progressWrapperRender(op, key, description, progressInt, speedInt)
		}

		writePipe := &ioprogress.ProgressWriter{
			WriteCloser: writer,
			Tracker: &ioprogress.ProgressTracker{
				Handler: progress,
			},
		}

		return writePipe
	}
}

// ProgressTracker returns a migration I/O tracker.
func ProgressTracker(op *operations.Operation, key string, description string) *ioprogress.ProgressTracker {
	progress := func(progressInt int64, speedInt int64) {
		progressWrapperRender(op, key, description, progressInt, speedInt)
	}

	tracker := &ioprogress.ProgressTracker{
		Handler: progress,
	}

	return tracker
}