1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
|
package storage
import (
"context"
"fmt"
"net/http"
"slices"
"time"
"github.com/lxc/incus/v6/internal/server/db"
"github.com/lxc/incus/v6/internal/server/db/cluster"
"github.com/lxc/incus/v6/internal/server/instance/instancetype"
"github.com/lxc/incus/v6/internal/server/project"
"github.com/lxc/incus/v6/internal/server/storage/drivers"
"github.com/lxc/incus/v6/shared/api"
"github.com/lxc/incus/v6/shared/logger"
)
var earlyPatches = map[string]func(b *backend) error{
"storage_missing_snapshot_records": patchMissingSnapshotRecords,
"storage_delete_old_snapshot_records": patchDeleteOldSnapshotRecords,
"storage_prefix_bucket_names_with_project": patchBucketNames,
}
var latePatches = map[string]func(b *backend) error{}
// Patches start here.
// patchMissingSnapshotRecords creates any missing storage volume records for instance volume snapshots.
// This is needed because it seems that in 2019 some instance snapshots did not have their associated volume DB
// records created. This later caused problems when we started validating that the instance snapshot DB record
// count matched the volume snapshot DB record count.
func patchMissingSnapshotRecords(b *backend) error {
var err error
var localNode string
err = b.state.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
localNode, err = tx.GetLocalNodeName(ctx)
if err != nil {
return fmt.Errorf("Failed to get local member name: %w", err)
}
return err
})
if err != nil {
return err
}
// Get instances on this local server (as the DB helper functions return volumes on local server), also
// avoids running the same queries on every cluster member for instances on shared storage.
filter := cluster.InstanceFilter{Node: &localNode}
err = b.state.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
return tx.InstanceList(ctx, func(inst db.InstanceArgs, p api.Project) error {
// Check we can convert the instance to the volume type needed.
volType, err := InstanceTypeToVolumeType(inst.Type)
if err != nil {
return err
}
contentType := drivers.ContentTypeFS
if inst.Type == instancetype.VM {
contentType = drivers.ContentTypeBlock
}
// Get all the instance snapshot DB records.
var instPoolName string
var snapshots []cluster.Instance
err = b.state.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
instPoolName, err = tx.GetInstancePool(ctx, p.Name, inst.Name)
if err != nil {
if api.StatusErrorCheck(err, http.StatusNotFound) {
// If the instance cannot be associated to a pool its got bigger problems
// outside the scope of this patch. Will skip due to empty instPoolName.
return nil
}
return fmt.Errorf("Failed finding pool for instance %q in project %q: %w", inst.Name, p.Name, err)
}
snapshots, err = tx.GetInstanceSnapshotsWithName(ctx, p.Name, inst.Name)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
if instPoolName != b.Name() {
return nil // This instance isn't hosted on this storage pool, skip.
}
dbVol, err := VolumeDBGet(b, p.Name, inst.Name, volType)
if err != nil {
return fmt.Errorf("Failed loading storage volume record %q: %w", inst.Name, err)
}
// Get all the instance volume snapshot DB records.
dbVolSnaps, err := VolumeDBSnapshotsGet(b, p.Name, inst.Name, volType)
if err != nil {
return fmt.Errorf("Failed loading storage volume snapshot records %q: %w", inst.Name, err)
}
for i := range snapshots {
foundVolumeSnapshot := false
for _, dbVolSnap := range dbVolSnaps {
if dbVolSnap.Name == snapshots[i].Name {
foundVolumeSnapshot = true
break
}
}
if !foundVolumeSnapshot {
b.logger.Info("Creating missing volume snapshot record", logger.Ctx{"project": snapshots[i].Project, "instance": snapshots[i].Name})
err = VolumeDBCreate(b, snapshots[i].Project, snapshots[i].Name, "Auto repaired", volType, true, dbVol.Config, snapshots[i].CreationDate, time.Time{}, contentType, false, true)
if err != nil {
return err
}
}
}
return nil
}, filter)
})
if err != nil {
return err
}
return nil
}
// patchDeleteOldSnapshotRecords deletes the remaining snapshot records in storage_volumes
// (a previous patch would have already moved them into storage_volume_snapshots).
func patchDeleteOldSnapshotRecords(b *backend) error {
err := b.state.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
nodeID := tx.GetNodeID()
_, err := tx.Tx().Exec(`
DELETE FROM storage_volumes WHERE id IN (
SELECT id FROM storage_volumes
JOIN (
/* Create a two column intermediary table containing the container name and its snapshot name */
SELECT sv.name inst_name, svs.name snap_name FROM storage_volumes AS sv
JOIN storage_volumes_snapshots AS svs ON sv.id = svs.storage_volume_id AND node_id=? AND type=?
) j1 ON name=printf("%s/%s", j1.inst_name, j1.snap_name)
/* Only keep the records with a matching 'name' pattern, 'node_id' and 'type' */
);
`, nodeID, db.StoragePoolVolumeTypeContainer)
if err != nil {
return fmt.Errorf("Failed to delete remaining instance snapshot records in the `storage_volumes` table: %w", err)
}
return nil
})
if err != nil {
return err
}
return nil
}
// patchBucketNames modifies the naming convention of bucket volumes by adding
// the corresponding project name as a prefix.
func patchBucketNames(b *backend) error {
// Apply patch only for btrfs, dir, lvm, and zfs drivers.
if !slices.Contains([]string{"btrfs", "dir", "lvm", "zfs"}, b.driver.Info().Name) {
return nil
}
var buckets map[string]*db.StorageBucket
err := b.state.DB.Cluster.Transaction(b.state.ShutdownCtx, func(ctx context.Context, tx *db.ClusterTx) error {
// Get local storage buckets.
localBuckets, err := tx.GetStoragePoolBuckets(ctx, true)
if err != nil {
return err
}
buckets = make(map[string]*db.StorageBucket, len(localBuckets))
for _, bucket := range localBuckets {
buckets[bucket.Name] = bucket
}
return nil
})
if err != nil {
return err
}
// Get list of volumes.
volumes, err := b.driver.ListVolumes()
if err != nil {
return err
}
for _, v := range volumes {
// Ensure volume is of type bucket.
if v.Type() != drivers.VolumeTypeBucket {
continue
}
// Retrieve the bucket associated with the current volume's name.
bucket, ok := buckets[v.Name()]
if !ok {
continue
}
newVolumeName := project.StorageVolume(bucket.Project, bucket.Name)
// Rename volume.
err := b.driver.RenameVolume(v, newVolumeName, nil)
if err != nil {
return err
}
}
return nil
}
|