1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
|
package migration
import (
"fmt"
"io"
"net/http"
"slices"
"github.com/lxc/incus/v6/internal/migration"
backupConfig "github.com/lxc/incus/v6/internal/server/backup/config"
"github.com/lxc/incus/v6/internal/server/operations"
"github.com/lxc/incus/v6/shared/api"
"github.com/lxc/incus/v6/shared/ioprogress"
"github.com/lxc/incus/v6/shared/units"
)
// Info represents the index frame sent if supported.
type Info struct {
Config *backupConfig.Config `json:"config,omitempty" yaml:"config,omitempty"` // Equivalent of backup.yaml but embedded in index.
}
// InfoResponse represents the response to the index frame sent if supported.
// Right now this doesn't contain anything useful, its just used to indicate receipt of the index header.
// But in the future the intention is to use it allow the target to send back additional information to the source
// about which frames (such as snapshots) it needs for the migration after having inspected the Info index header.
type InfoResponse struct {
StatusCode int
Error string
Refresh *bool // This is used to let the source know whether to actually refresh a volume.
}
// Err returns the error of the response.
func (r *InfoResponse) Err() error {
if r.StatusCode != http.StatusOK {
return api.StatusErrorf(r.StatusCode, "%s", r.Error)
}
return nil
}
// Type represents the migration transport type. It indicates the method by which the migration can
// take place and what optional features are available.
type Type struct {
FSType migration.MigrationFSType // Transport mode selected.
Features []string // Feature hints for selected FSType transport mode.
}
// VolumeSourceArgs represents the arguments needed to setup a volume migration source.
type VolumeSourceArgs struct {
IndexHeaderVersion uint32
Name string
Snapshots []string
MigrationType Type
TrackProgress bool
MultiSync bool
FinalSync bool
Data any // Optional store to persist storage driver state between MultiSync phases.
ContentType string
AllowInconsistent bool
Refresh bool
Info *Info
VolumeOnly bool
ClusterMove bool
StorageMove bool
}
// VolumeTargetArgs represents the arguments needed to setup a volume migration sink.
type VolumeTargetArgs struct {
IndexHeaderVersion uint32
Name string
Description string
Config map[string]string // Only used for custom volume migration.
Snapshots []*migration.Snapshot
MigrationType Type
TrackProgress bool
Refresh bool
RefreshExcludeOlder bool
Live bool
VolumeSize int64
ContentType string
VolumeOnly bool
ClusterMoveSourceName string
StoragePool string
}
// TypesToHeader converts one or more Types to a MigrationHeader. It uses the first type argument
// supplied to indicate the preferred migration method and sets the MigrationHeader's Fs type
// to that. If the preferred type is ZFS then it will also set the header's optional ZfsFeatures.
// If the fallback Rsync type is present in any of the types even if it is not preferred, then its
// optional features are added to the header's RsyncFeatures, allowing for fallback negotiation to
// take place on the farside.
func TypesToHeader(types ...Type) *migration.MigrationHeader {
missingFeature := false
hasFeature := true
var preferredType Type
if len(types) > 0 {
preferredType = types[0]
}
header := migration.MigrationHeader{Fs: &preferredType.FSType}
// Add ZFS features if preferred type is ZFS.
if preferredType.FSType == migration.MigrationFSType_ZFS {
features := migration.ZfsFeatures{
Compress: &missingFeature,
}
for _, feature := range preferredType.Features {
if feature == "compress" {
features.Compress = &hasFeature
} else if feature == migration.ZFSFeatureMigrationHeader {
features.MigrationHeader = &hasFeature
} else if feature == migration.ZFSFeatureZvolFilesystems {
features.HeaderZvols = &hasFeature
}
}
header.ZfsFeatures = &features
}
// Add BTRFS features if preferred type is BTRFS.
if preferredType.FSType == migration.MigrationFSType_BTRFS {
features := migration.BtrfsFeatures{
MigrationHeader: &missingFeature,
HeaderSubvolumes: &missingFeature,
}
for _, feature := range preferredType.Features {
if feature == migration.BTRFSFeatureMigrationHeader {
features.MigrationHeader = &hasFeature
} else if feature == migration.BTRFSFeatureSubvolumes {
features.HeaderSubvolumes = &hasFeature
} else if feature == migration.BTRFSFeatureSubvolumeUUIDs {
features.HeaderSubvolumeUuids = &hasFeature
}
}
header.BtrfsFeatures = &features
}
// Check all the types for an Rsync method, if found add its features to the header's RsyncFeatures list.
for _, t := range types {
if t.FSType != migration.MigrationFSType_RSYNC && t.FSType != migration.MigrationFSType_BLOCK_AND_RSYNC {
continue
}
features := migration.RsyncFeatures{
Xattrs: &missingFeature,
Delete: &missingFeature,
Compress: &missingFeature,
Bidirectional: &missingFeature,
}
for _, feature := range t.Features {
if feature == "xattrs" {
features.Xattrs = &hasFeature
} else if feature == "delete" {
features.Delete = &hasFeature
} else if feature == "compress" {
features.Compress = &hasFeature
} else if feature == "bidirectional" {
features.Bidirectional = &hasFeature
}
}
header.RsyncFeatures = &features
break // Only use the first rsync transport type found to generate rsync features list.
}
return &header
}
// MatchTypes attempts to find matching migration transport types between an offered type sent from a remote
// source and the types supported by a local storage pool. If matches are found then one or more Types are
// returned containing the method and the matching optional features present in both. The function also takes a
// fallback type which is used as an additional offer type preference in case the preferred remote type is not
// compatible with the local type available. It is expected that both sides of the migration will support the
// fallback type for the volume's content type that is being migrated.
func MatchTypes(offer *migration.MigrationHeader, fallbackType migration.MigrationFSType, ourTypes []Type) ([]Type, error) {
// Generate an offer types slice from the preferred type supplied from remote and the
// fallback type supplied based on the content type of the transfer.
offeredFSTypes := []migration.MigrationFSType{offer.GetFs(), fallbackType}
matchedTypes := []Type{}
// Find first matching type.
for _, ourType := range ourTypes {
for _, offerFSType := range offeredFSTypes {
if offerFSType != ourType.FSType {
continue // Not a match, try the next one.
}
// We got a match, now extract the relevant offered features.
var offeredFeatures []string
if offerFSType == migration.MigrationFSType_ZFS {
offeredFeatures = offer.GetZfsFeaturesSlice()
} else if offerFSType == migration.MigrationFSType_BTRFS {
offeredFeatures = offer.GetBtrfsFeaturesSlice()
} else if offerFSType == migration.MigrationFSType_RSYNC {
offeredFeatures = offer.GetRsyncFeaturesSlice()
}
// Find common features in both our type and offered type.
commonFeatures := []string{}
for _, ourFeature := range ourType.Features {
if slices.Contains(offeredFeatures, ourFeature) {
commonFeatures = append(commonFeatures, ourFeature)
}
}
if offer.GetRefresh() {
// Optimized refresh with zfs only works if ZfsFeatureMigrationHeader is available.
if ourType.FSType == migration.MigrationFSType_ZFS && !slices.Contains(commonFeatures, migration.ZFSFeatureMigrationHeader) {
continue
}
// Optimized refresh with btrfs only works if BtrfsFeatureSubvolumeUUIDs is available.
if ourType.FSType == migration.MigrationFSType_BTRFS && !slices.Contains(commonFeatures, migration.BTRFSFeatureSubvolumeUUIDs) {
continue
}
}
// Append type with combined features.
matchedTypes = append(matchedTypes, Type{
FSType: ourType.FSType,
Features: commonFeatures,
})
}
}
if len(matchedTypes) < 1 {
// No matching transport type found, generate an error with offered types and our types.
offeredTypeStrings := make([]string, 0, len(offeredFSTypes))
for _, offerFSType := range offeredFSTypes {
offeredTypeStrings = append(offeredTypeStrings, offerFSType.String())
}
ourTypeStrings := make([]string, 0, len(ourTypes))
for _, ourType := range ourTypes {
ourTypeStrings = append(ourTypeStrings, ourType.FSType.String())
}
return matchedTypes, fmt.Errorf("No matching migration types found. Offered types: %v, our types: %v", offeredTypeStrings, ourTypeStrings)
}
return matchedTypes, nil
}
func progressWrapperRender(op *operations.Operation, key string, description string, progressInt int64, speedInt int64) {
meta := op.Metadata()
if meta == nil {
meta = make(map[string]any)
}
progress := fmt.Sprintf("%s (%s/s)", units.GetByteSizeString(progressInt, 2), units.GetByteSizeString(speedInt, 2))
if description != "" {
progress = fmt.Sprintf("%s: %s (%s/s)", description, units.GetByteSizeString(progressInt, 2), units.GetByteSizeString(speedInt, 2))
}
if meta[key] != progress {
meta[key] = progress
_ = op.UpdateMetadata(meta)
}
}
// ProgressReader reports the read progress.
func ProgressReader(op *operations.Operation, key string, description string) func(io.ReadCloser) io.ReadCloser {
return func(reader io.ReadCloser) io.ReadCloser {
if op == nil {
return reader
}
progress := func(progressInt int64, speedInt int64) {
progressWrapperRender(op, key, description, progressInt, speedInt)
}
readPipe := &ioprogress.ProgressReader{
ReadCloser: reader,
Tracker: &ioprogress.ProgressTracker{
Handler: progress,
},
}
return readPipe
}
}
// ProgressWriter reports the write progress.
func ProgressWriter(op *operations.Operation, key string, description string) func(io.WriteCloser) io.WriteCloser {
return func(writer io.WriteCloser) io.WriteCloser {
if op == nil {
return writer
}
progress := func(progressInt int64, speedInt int64) {
progressWrapperRender(op, key, description, progressInt, speedInt)
}
writePipe := &ioprogress.ProgressWriter{
WriteCloser: writer,
Tracker: &ioprogress.ProgressTracker{
Handler: progress,
},
}
return writePipe
}
}
// ProgressTracker returns a migration I/O tracker.
func ProgressTracker(op *operations.Operation, key string, description string) *ioprogress.ProgressTracker {
progress := func(progressInt int64, speedInt int64) {
progressWrapperRender(op, key, description, progressInt, speedInt)
}
tracker := &ioprogress.ProgressTracker{
Handler: progress,
}
return tracker
}
|