1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
|
package cluster
import (
"context"
"database/sql"
"errors"
"fmt"
"path/filepath"
"strings"
"sync/atomic"
driver "github.com/cowsql/go-cowsql/driver"
"github.com/lxc/incus/v6/internal/server/db/query"
"github.com/lxc/incus/v6/internal/server/db/schema"
daemonUtil "github.com/lxc/incus/v6/internal/server/util"
internalUtil "github.com/lxc/incus/v6/internal/util"
"github.com/lxc/incus/v6/internal/version"
"github.com/lxc/incus/v6/shared/logger"
"github.com/lxc/incus/v6/shared/osarch"
)
// Open the cluster database object.
//
// The name argument is the name of the cluster database. It defaults to
// 'db.bin', but can be overwritten for testing.
//
// The dialer argument is a function that returns a gRPC dialer that can be
// used to connect to a database node using the gRPC SQL package.
func Open(name string, store driver.NodeStore, options ...driver.Option) (*sql.DB, error) {
driver, err := driver.New(store, options...)
if err != nil {
return nil, fmt.Errorf("Failed to create dqlite driver: %w", err)
}
driverName := dqliteDriverName()
sql.Register(driverName, driver)
// Create the cluster db. This won't immediately establish any network
// connection, that will happen only when a db transaction is started
// (see the database/sql connection pooling code for more details).
if name == "" {
name = "db.bin"
}
db, err := sql.Open(driverName, name)
if err != nil {
return nil, fmt.Errorf("cannot open cluster database: %w", err)
}
return db, nil
}
// EnsureSchema applies all relevant schema updates to the cluster database.
//
// Before actually doing anything, this function will make sure that all nodes
// in the cluster have a schema version and a number of API extensions that
// match our one. If it's not the case, we either return an error (if some
// nodes have version greater than us and we need to be upgraded), or return
// false and no error (if some nodes have a lower version, and we need to wait
// till they get upgraded and restarted).
func EnsureSchema(db *sql.DB, address string, dir string) (bool, error) {
someNodesAreBehind := false
apiExtensions := version.APIExtensionsCount()
backupDone := false
hook := func(ctx context.Context, schemaVersion int, tx *sql.Tx) error {
// Check if this is a fresh instance.
isUpdate, err := schema.DoesSchemaTableExist(ctx, tx)
if err != nil {
return fmt.Errorf("Failed to check if schema table exists: %w", err)
}
if !isUpdate {
return nil
}
// Check if we're clustered
clustered := true
n, err := selectUnclusteredNodesCount(ctx, tx)
if err != nil {
return fmt.Errorf("Failed to fetch standalone member count: %w", err)
}
if n > 1 {
// This should never happen, since we only add cluster members with valid addresses.
return errors.New("Found more than one cluster member with a standalone address (0.0.0.0)")
} else if n == 1 {
clustered = false
}
// If we're not clustered, backup the local cluster database directory
// before performing any schema change. This makes sense only in the
// non-clustered case, because otherwise the directory would be
// re-populated by replication.
if !clustered && !backupDone {
logger.Infof("Updating the global schema. Backup made as \"global.bak\"")
err := internalUtil.DirCopy(
filepath.Join(dir, "global"),
filepath.Join(dir, "global.bak"),
)
if err != nil {
return fmt.Errorf("Failed to backup global database: %w", err)
}
backupDone = true
}
if schemaVersion == -1 {
logger.Debugf("Running pre-update queries from file for global DB schema")
} else {
logger.Debugf("Updating global DB schema from %d to %d", schemaVersion, schemaVersion+1)
}
return nil
}
check := func(ctx context.Context, current int, tx *sql.Tx) error {
// If we're bootstrapping a fresh schema, skip any check, since
// it's safe to assume we are the only node.
if current == 0 {
return nil
}
// Check if we're clustered
n, err := selectUnclusteredNodesCount(ctx, tx)
if err != nil {
return fmt.Errorf("Failed to fetch standalone member count: %w", err)
}
if n > 1 {
// This should never happen, since we only add nodes with valid addresses.
return errors.New("Found more than one cluster member with a standalone address (0.0.0.0)")
} else if n == 1 {
address = "0.0.0.0" // We're not clustered
}
// Update the schema and api_extension columns of ourselves.
err = updateNodeVersion(tx, address, apiExtensions)
if err != nil {
return fmt.Errorf("Failed to update cluster member version info: %w", err)
}
err = checkClusterIsUpgradable(ctx, tx, [2]int{len(updates), apiExtensions})
if errors.Is(err, errSomeNodesAreBehind) {
someNodesAreBehind = true
return schema.ErrGracefulAbort
}
return err
}
schema := Schema()
schema.File(filepath.Join(dir, "patch.global.sql")) // Optional custom queries
schema.Check(check)
schema.Hook(hook)
var initial int
err := query.Retry(context.TODO(), func(_ context.Context) error {
var err error
initial, err = schema.Ensure(db)
return err
})
if someNodesAreBehind {
return false, nil
}
if err != nil {
return false, err
}
// When creating a database from scratch, insert an entry for node
// 1. This is needed for referential integrity with other tables. Also,
// create a default profile.
if initial == 0 {
arch, err := osarch.ArchitectureGetLocalID()
if err != nil {
return false, err
}
err = query.Transaction(context.TODO(), db, func(ctx context.Context, tx *sql.Tx) error {
stmt := `
INSERT INTO nodes(id, name, address, schema, api_extensions, arch, description) VALUES(1, 'none', '0.0.0.0', ?, ?, ?, '')
`
_, err = tx.Exec(stmt, SchemaVersion, apiExtensions, arch)
if err != nil {
return err
}
// Default project
var defaultProjectStmt strings.Builder
_, _ = defaultProjectStmt.WriteString("INSERT INTO projects (name, description) VALUES ('default', 'Default Incus project');")
// Enable all features for default project.
for featureName := range ProjectFeatures {
_, _ = defaultProjectStmt.WriteString(fmt.Sprintf("INSERT INTO projects_config (project_id, key, value) VALUES (1, '%s', 'true');", featureName))
}
_, err = tx.Exec(defaultProjectStmt.String())
if err != nil {
return err
}
// Default profile
stmt = `
INSERT INTO profiles (name, description, project_id) VALUES ('default', 'Default Incus profile', 1)
`
_, err = tx.Exec(stmt)
if err != nil {
return err
}
// Default cluster group
stmt = `
INSERT INTO cluster_groups (name, description) VALUES ('default', 'Default cluster group');
INSERT INTO nodes_cluster_groups (node_id, group_id) VALUES(1, 1);
`
_, err = tx.Exec(stmt)
if err != nil {
return err
}
return nil
})
if err != nil {
return false, err
}
}
return true, err
}
// Generate a new name for the dqlite driver registration. We need it to be
// unique for testing, see below.
func dqliteDriverName() string {
defer atomic.AddUint64(&dqliteDriverSerial, 1)
return fmt.Sprintf("dqlite-%d", dqliteDriverSerial)
}
// Monotonic serial number for registering new instances of dqlite.Driver
// using the database/sql stdlib package. This is needed since there's no way
// to unregister drivers, and in unit tests more than one driver gets
// registered.
var dqliteDriverSerial uint64
func checkClusterIsUpgradable(ctx context.Context, tx *sql.Tx, target [2]int) error {
// Get the current versions in the nodes table.
versions, err := selectNodesVersions(ctx, tx)
if err != nil {
return fmt.Errorf("failed to fetch current nodes versions: %w", err)
}
for _, version := range versions {
// Compare schema versions only.
n, err := daemonUtil.CompareVersions(target, version, false)
if err != nil {
return err
}
switch n {
case 0:
// Versions are equal, there's hope for the
// update. Let's check the next node.
continue
case 1:
// Our version is bigger, we should stop here
// and wait for other nodes to be upgraded and
// restarted.
return errSomeNodesAreBehind
case 2:
// Another node has a version greater than ours
// and presumably is waiting for other nodes
// to upgrade. Let's error out and shutdown
// since we need a greater version.
return errors.New("This cluster member's version is behind, please upgrade")
default:
panic("Unexpected return value from compareVersions")
}
}
return nil
}
var errSomeNodesAreBehind = errors.New("Some cluster members are behind this cluster member's version")
|