File: membership.go

package info (click to toggle)
incus 6.0.5-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 24,428 kB
  • sloc: sh: 16,313; ansic: 3,121; python: 457; makefile: 337; ruby: 51; sql: 50; lisp: 6
file content (1254 lines) | stat: -rw-r--r-- 37,742 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
package cluster

import (
	"context"
	"crypto/x509"
	"encoding/pem"
	"errors"
	"fmt"
	"os"
	"path/filepath"
	"slices"
	"sync"
	"time"

	"github.com/cowsql/go-cowsql/app"
	"github.com/cowsql/go-cowsql/client"

	"github.com/lxc/incus/v6/internal/server/certificate"
	"github.com/lxc/incus/v6/internal/server/db"
	"github.com/lxc/incus/v6/internal/server/db/cluster"
	"github.com/lxc/incus/v6/internal/server/node"
	"github.com/lxc/incus/v6/internal/server/state"
	internalUtil "github.com/lxc/incus/v6/internal/util"
	"github.com/lxc/incus/v6/internal/version"
	"github.com/lxc/incus/v6/shared/logger"
	localtls "github.com/lxc/incus/v6/shared/tls"
	"github.com/lxc/incus/v6/shared/util"
)

// errClusterBusy is returned by dqlite if attempting attempting to join a cluster at the same time as a role-change.
// This error tells us we can retry and probably join the cluster or fail due to something else.
// The error code here is SQLITE_BUSY.
var errClusterBusy = errors.New("A configuration change is already in progress (5)")

// Bootstrap turns a non-clustered server into the first (and leader)
// member of a new cluster.
//
// This instance must already have its cluster.https_address set and be listening
// on the associated network address.
func Bootstrap(state *state.State, gateway *Gateway, serverName string) error {
	// Check parameters
	if serverName == "" {
		return errors.New("Server name must not be empty")
	}

	err := membershipCheckNoLeftoverClusterCert(state.OS.VarDir)
	if err != nil {
		return err
	}

	var localClusterAddress string

	err = state.DB.Node.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error {
		// Fetch current network address and raft nodes
		config, err := node.ConfigLoad(ctx, tx)
		if err != nil {
			return fmt.Errorf("Failed to fetch node configuration: %w", err)
		}

		localClusterAddress = config.ClusterAddress()

		// Make sure node-local database state is in order.
		err = membershipCheckNodeStateForBootstrapOrJoin(ctx, tx, localClusterAddress)
		if err != nil {
			return err
		}

		// Add ourselves as first raft node
		err = tx.CreateFirstRaftNode(localClusterAddress, serverName)
		if err != nil {
			return fmt.Errorf("Failed to insert first raft node: %w", err)
		}

		return nil
	})
	if err != nil {
		return err
	}

	// Update our own entry in the nodes table.
	err = state.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
		// Make sure cluster database state is in order.
		err := membershipCheckClusterStateForBootstrapOrJoin(ctx, tx)
		if err != nil {
			return err
		}

		// Add ourselves to the nodes table.
		err = tx.BootstrapNode(serverName, localClusterAddress)
		if err != nil {
			return fmt.Errorf("Failed updating cluster member: %w", err)
		}

		err = EnsureServerCertificateTrusted(serverName, state.ServerCert(), tx)
		if err != nil {
			return fmt.Errorf("Failed ensuring server certificate is trusted: %w", err)
		}

		return nil
	})
	if err != nil {
		return err
	}

	// Reload the trusted certificate cache to enable the certificate we just added to the local trust store
	// to be used when validating endpoint connections. This will allow Dqlite to connect to ourselves.
	state.UpdateCertificateCache()

	// Shutdown the gateway. This will trash any dqlite connection against
	// our in-memory dqlite driver and shutdown the associated raft
	// instance. We also lock regular access to the cluster database since
	// we don't want any other database code to run while we're
	// reconfiguring raft.
	err = state.DB.Cluster.EnterExclusive()
	if err != nil {
		return fmt.Errorf("Failed to acquire cluster database lock: %w", err)
	}

	err = gateway.Shutdown()
	if err != nil {
		return fmt.Errorf("Failed to shutdown gRPC SQL gateway: %w", err)
	}

	// The cluster CA certificate is a symlink against the regular server CA certificate.
	if util.PathExists(filepath.Join(state.OS.VarDir, "server.ca")) {
		err := os.Symlink("server.ca", filepath.Join(state.OS.VarDir, "cluster.ca"))
		if err != nil {
			return fmt.Errorf("Failed to symlink server CA cert to cluster CA cert: %w", err)
		}
	}

	// Generate a new cluster certificate.
	clusterCert, err := internalUtil.LoadClusterCert(state.OS.VarDir)
	if err != nil {
		return fmt.Errorf("Failed to create cluster cert: %w", err)
	}

	// If endpoint listeners are active, apply new cluster certificate.
	if state.Endpoints != nil {
		gateway.networkCert = clusterCert
		state.Endpoints.NetworkUpdateCert(clusterCert)
	}

	// Re-initialize the gateway. This will create a new raft factory an
	// dqlite driver instance, which will be exposed over gRPC by the
	// gateway handlers.
	err = gateway.init(true)
	if err != nil {
		return fmt.Errorf("Failed to re-initialize gRPC SQL gateway: %w", err)
	}

	err = gateway.WaitLeadership()
	if err != nil {
		return err
	}

	// Make sure we can actually connect to the cluster database through
	// the network endpoint. This also releases the previously acquired
	// lock and makes the Go SQL pooling system invalidate the old
	// connection, so new queries will be executed over the new network
	// connection.
	err = state.DB.Cluster.ExitExclusive(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
		_, err := tx.GetNodes(ctx)
		if err != nil {
			return fmt.Errorf("Failed getting cluster members: %w", err)
		}

		return nil
	})
	if err != nil {
		return fmt.Errorf("Cluster database initialization failed: %w", err)
	}

	return nil
}

// EnsureServerCertificateTrusted adds the serverCert to the DB trusted certificates store using the serverName.
// If a certificate with the same fingerprint is already in the trust store, but is of the wrong type or name then
// the existing certificate is updated to the correct type and name. If the existing certificate is the correct
// type but the wrong name then an error is returned. And if the existing certificate is the correct type and name
// then nothing more is done.
func EnsureServerCertificateTrusted(serverName string, serverCert *localtls.CertInfo, tx *db.ClusterTx) error {
	// Parse our server certificate and prepare to add it to DB trust store.
	serverCertx509, err := x509.ParseCertificate(serverCert.KeyPair().Certificate[0])
	if err != nil {
		return err
	}

	fingerprint := localtls.CertFingerprint(serverCertx509)

	dbCert := cluster.Certificate{
		Fingerprint: fingerprint,
		Type:        certificate.TypeServer, // Server type for intra-member communication.
		Name:        serverName,
		Certificate: string(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: serverCertx509.Raw})),
	}

	// Add our server cert to the DB trust store (so when other members join this cluster they will be
	// able to trust intra-cluster requests from this member).
	ctx := context.Background()
	existingCert, _ := cluster.GetCertificate(ctx, tx.Tx(), dbCert.Fingerprint)
	if existingCert != nil {
		if existingCert.Name != dbCert.Name && existingCert.Type == certificate.TypeServer {
			// Don't alter an existing server certificate that has our fingerprint but not our name.
			// Something is wrong as this shouldn't happen.
			return fmt.Errorf("Existing server certificate with different name %q already in trust store", existingCert.Name)
		} else if existingCert.Name != dbCert.Name && existingCert.Type != certificate.TypeServer {
			// Ensure that if a client certificate already exists that matches our fingerprint, that it
			// has the correct name and type for cluster operation, to allow us to associate member
			// server names to certificate names.
			err = cluster.UpdateCertificate(ctx, tx.Tx(), dbCert.Fingerprint, dbCert)
			if err != nil {
				return fmt.Errorf("Failed updating certificate name and type in trust store: %w", err)
			}
		}
	} else {
		_, err = cluster.CreateCertificate(ctx, tx.Tx(), dbCert)
		if err != nil {
			return fmt.Errorf("Failed adding server certificate to trust store: %w", err)
		}
	}

	return nil
}

// Accept a new node and add it to the cluster.
//
// This instance must already be clustered.
//
// Return an updated list raft database nodes (possibly including the newly
// accepted node).
func Accept(state *state.State, gateway *Gateway, name, address string, schema, api, arch int) ([]db.RaftNode, error) {
	// Check parameters
	if name == "" {
		return nil, errors.New("Member name must not be empty")
	}

	if address == "" {
		return nil, errors.New("Member address must not be empty")
	}

	// Insert the new node into the nodes table.
	var id int64
	err := state.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
		// Check that the node can be accepted with these parameters.
		err := membershipCheckClusterStateForAccept(ctx, tx, name, address, schema, api)
		if err != nil {
			return err
		}

		// Add the new node.
		id, err = tx.CreateNodeWithArch(name, address, arch)
		if err != nil {
			return fmt.Errorf("Failed to insert new node into the database: %w", err)
		}

		// Mark the node as pending, so it will be skipped when
		// performing heartbeats or sending cluster
		// notifications.
		err = tx.SetNodePendingFlag(id, true)
		if err != nil {
			return fmt.Errorf("Failed to mark the new node as pending: %w", err)
		}

		return nil
	})
	if err != nil {
		return nil, err
	}

	// Possibly insert the new node into the raft_nodes table (if we have
	// less than 3 database nodes).
	nodes, err := gateway.currentRaftNodes()
	if err != nil {
		return nil, fmt.Errorf("Failed to get raft nodes from the log: %w", err)
	}

	count := len(nodes) // Existing nodes
	voters := 0
	standbys := 0
	for _, node := range nodes {
		switch node.Role {
		case db.RaftVoter:
			voters++
		case db.RaftStandBy:
			standbys++
		}
	}

	node := db.RaftNode{
		NodeInfo: client.NodeInfo{
			ID:      uint64(id),
			Address: address,
			Role:    db.RaftSpare,
		},
		Name: name,
	}

	if count > 1 && voters < int(state.GlobalConfig.MaxVoters()) {
		node.Role = db.RaftVoter
	} else if standbys < int(state.GlobalConfig.MaxStandBy()) {
		node.Role = db.RaftStandBy
	}

	nodes = append(nodes, node)

	return nodes, nil
}

// Join makes a non-clustered server join an existing cluster.
//
// It's assumed that Accept() was previously called against the leader node,
// which handed the raft server ID.
//
// The cert parameter must contain the keypair/CA material of the cluster being
// joined.
func Join(state *state.State, gateway *Gateway, networkCert *localtls.CertInfo, serverCert *localtls.CertInfo, name string, raftNodes []db.RaftNode) error {
	// Check parameters
	if name == "" {
		return errors.New("Member name must not be empty")
	}

	var localClusterAddress string
	err := state.DB.Node.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error {
		// Fetch current network address and raft nodes
		config, err := node.ConfigLoad(ctx, tx)
		if err != nil {
			return fmt.Errorf("Failed to fetch node configuration: %w", err)
		}

		localClusterAddress = config.ClusterAddress()

		// Make sure node-local database state is in order.
		err = membershipCheckNodeStateForBootstrapOrJoin(ctx, tx, localClusterAddress)
		if err != nil {
			return err
		}

		// Set the raft nodes list to the one that was returned by Accept().
		err = tx.ReplaceRaftNodes(raftNodes)
		if err != nil {
			return fmt.Errorf("Failed to set raft nodes: %w", err)
		}

		return nil
	})
	if err != nil {
		return err
	}

	// Get the local config keys for the cluster pools and networks. It
	// assumes that the local storage pools and networks match the cluster
	// networks, if not an error will be returned. Also get any outstanding
	// operation, typically there will be just one, created by the POST
	// /cluster/nodes request which triggered this code.
	var pools map[string]map[string]string
	var networks map[string]map[string]string
	var operations []cluster.Operation

	err = state.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
		pools, err = tx.GetStoragePoolsLocalConfig(ctx)
		if err != nil {
			return err
		}

		networks, err = tx.GetNetworksLocalConfig(ctx)
		if err != nil {
			return err
		}

		nodeID := tx.GetNodeID()
		filter := cluster.OperationFilter{NodeID: &nodeID}
		operations, err = cluster.GetOperations(ctx, tx.Tx(), filter)
		if err != nil {
			return err
		}

		return nil
	})
	if err != nil {
		return err
	}

	// Lock regular access to the cluster database since we don't want any
	// other database code to run while we're reconfiguring raft.
	err = state.DB.Cluster.EnterExclusive()
	if err != nil {
		return fmt.Errorf("Failed to acquire cluster database lock: %w", err)
	}

	// Shutdown the gateway and wipe any raft data. This will trash any
	// gRPC SQL connection against our in-memory dqlite driver and shutdown
	// the associated raft instance.
	err = gateway.Shutdown()
	if err != nil {
		return fmt.Errorf("Failed to shutdown gRPC SQL gateway: %w", err)
	}

	err = os.RemoveAll(state.OS.GlobalDatabaseDir())
	if err != nil {
		return fmt.Errorf("Failed to remove existing raft data: %w", err)
	}

	// Re-initialize the gateway. This will create a new raft factory an
	// dqlite driver instance, which will be exposed over gRPC by the
	// gateway handlers.
	gateway.networkCert = networkCert
	err = gateway.init(false)
	if err != nil {
		return fmt.Errorf("Failed to re-initialize gRPC SQL gateway: %w", err)
	}

	// If we are listed among the database nodes, join the raft cluster.
	var info *db.RaftNode
	for _, node := range raftNodes {
		if node.Address == localClusterAddress {
			info = &node
		}
	}

	if info == nil {
		panic("Joining member not found")
	}

	logger.Info("Joining dqlite raft cluster", logger.Ctx{"id": info.ID, "local": info.Address, "role": info.Role})
	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()
	client, err := client.FindLeader(
		ctx, gateway.NodeStore(),
		client.WithDialFunc(gateway.raftDial()),
		client.WithLogFunc(DqliteLog),
	)
	if err != nil {
		return fmt.Errorf("Failed to connect to cluster leader: %w", err)
	}

	defer func() { _ = client.Close() }()

	logger.Info("Adding node to cluster", logger.Ctx{"id": info.ID, "local": info.Address, "role": info.Role})
	ctx, cancel = context.WithTimeout(context.Background(), time.Minute)
	defer cancel()

	// Repeatedly try to join in case the cluster is busy with a role-change.
	joined := false
	for !joined {
		select {
		case <-ctx.Done():
			return fmt.Errorf("Failed to join cluster: %w", ctx.Err())
		default:
			err = client.Add(ctx, info.NodeInfo)
			if err != nil && err.Error() == errClusterBusy.Error() {
				// If the cluster is busy with a role change, sleep a second and then keep trying to join.
				time.Sleep(1 * time.Second)
				continue
			}

			if err != nil {
				return fmt.Errorf("Failed to join cluster: %w", err)
			}

			joined = true
		}
	}

	// Make sure we can actually connect to the cluster database through
	// the network endpoint. This also releases the previously acquired
	// lock and makes the Go SQL pooling system invalidate the old
	// connection, so new queries will be executed over the new gRPC
	// network connection. Also, update the storage_pools and networks
	// tables with our local configuration.
	logger.Info("Migrate local data to cluster database")
	err = state.DB.Cluster.ExitExclusive(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
		node, err := tx.GetPendingNodeByAddress(ctx, localClusterAddress)
		if err != nil {
			return fmt.Errorf("Failed to get ID of joining node: %w", err)
		}

		state.DB.Cluster.NodeID(node.ID)
		tx.NodeID(node.ID)

		// Storage pools.
		ids, err := tx.GetNonPendingStoragePoolsNamesToIDs(ctx)
		if err != nil {
			return fmt.Errorf("Failed to get cluster storage pool IDs: %w", err)
		}

		for name, id := range ids {
			err := tx.UpdateStoragePoolAfterNodeJoin(id, node.ID)
			if err != nil {
				return fmt.Errorf("Failed to add joining node's to the pool: %w", err)
			}

			driver, err := tx.GetStoragePoolDriver(ctx, id)
			if err != nil {
				return fmt.Errorf("Failed to get storage pool driver: %w", err)
			}

			// For all pools we add the config provided by the joining node.
			config, ok := pools[name]
			if !ok {
				return fmt.Errorf("Joining member has no config for pool %s", name)
			}

			err = tx.CreateStoragePoolConfig(id, node.ID, config)
			if err != nil {
				return fmt.Errorf("Failed to add joining node's pool config: %w", err)
			}

			if slices.Contains(db.StorageRemoteDriverNames(), driver) {
				// For remote pools we have to create volume entries for the joining node.
				err := tx.UpdateRemoteStoragePoolAfterNodeJoin(ctx, id, node.ID)
				if err != nil {
					return fmt.Errorf("Failed to create remote volumes for joining node: %w", err)
				}
			}
		}

		// Networks.
		netids, err := tx.GetNonPendingNetworkIDs(ctx)
		if err != nil {
			return fmt.Errorf("Failed to get cluster network IDs: %w", err)
		}

		for _, network := range netids {
			for name, id := range network {
				config, ok := networks[name]
				if !ok {
					// Not all networks are present as virtual networks (OVN) don't need entries.
					continue
				}

				err := tx.NetworkNodeJoin(id, node.ID)
				if err != nil {
					return fmt.Errorf("Failed to add joining node's to the network: %w", err)
				}

				err = tx.CreateNetworkConfig(id, node.ID, config)
				if err != nil {
					return fmt.Errorf("Failed to add joining node's network config: %w", err)
				}
			}
		}

		// Migrate outstanding operations.
		for _, operation := range operations {
			op := cluster.Operation{
				UUID:   operation.UUID,
				Type:   operation.Type,
				NodeID: tx.GetNodeID(),
			}

			_, err := cluster.CreateOrReplaceOperation(ctx, tx.Tx(), op)
			if err != nil {
				return fmt.Errorf("Failed to migrate operation %s: %w", operation.UUID, err)
			}
		}

		// Remove the pending flag for ourselves
		// notifications.
		err = tx.SetNodePendingFlag(node.ID, false)
		if err != nil {
			return fmt.Errorf("Failed to unmark the node as pending: %w", err)
		}

		// Set last heartbeat time to now, as member is clearly online as it just successfully joined,
		// that way when we send the notification to all members below it will consider this member online.
		err = tx.SetNodeHeartbeat(node.Address, time.Now().UTC())
		if err != nil {
			return fmt.Errorf("Failed setting last heartbeat time for member: %w", err)
		}

		return nil
	})
	if err != nil {
		return fmt.Errorf("Cluster database initialization failed: %w", err)
	}

	// Generate partial heartbeat request containing just a raft node list.
	if state.Endpoints != nil {
		NotifyHeartbeat(state, gateway)
	}

	return nil
}

// NotifyHeartbeat attempts to send a heartbeat to all other members to notify them of a new or changed member.
func NotifyHeartbeat(state *state.State, gateway *Gateway) {
	// If a heartbeat round is already running (and implicitly this means we are the leader), then cancel it
	// so we can distribute the fresh member state info.
	heartbeatCancel := gateway.HearbeatCancelFunc()
	if heartbeatCancel != nil {
		heartbeatCancel()

		// Wait for heartbeat to finish and then release.
		// Ignore staticcheck "SA2001: empty critical section" because we want to wait for the lock.
		gateway.HeartbeatLock.Lock()
		gateway.HeartbeatLock.Unlock() //nolint:staticcheck
	}

	hbState := NewAPIHearbeat(state.DB.Cluster)
	hbState.Time = time.Now().UTC()

	var err error
	var raftNodes []db.RaftNode
	var localClusterAddress string
	err = state.DB.Node.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error {
		raftNodes, err = tx.GetRaftNodes(ctx)
		if err != nil {
			return err
		}

		config, err := node.ConfigLoad(ctx, tx)
		if err != nil {
			return err
		}

		localClusterAddress = config.ClusterAddress()

		return nil
	})
	if err != nil {
		logger.Warn("Failed to get current raft members", logger.Ctx{"err": err, "local": localClusterAddress})
		return
	}

	var members []db.NodeInfo
	err = state.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
		members, err = tx.GetNodes(ctx)
		if err != nil {
			return err
		}

		return nil
	})
	if err != nil {
		logger.Warn("Failed to get current cluster members", logger.Ctx{"err": err, "local": localClusterAddress})
		return
	}

	// Setup a full-state notification heartbeat.
	hbState.Update(true, raftNodes, members, gateway.HeartbeatOfflineThreshold)

	var wg sync.WaitGroup

	// Refresh local event listeners.
	wg.Add(1)
	go func() {
		EventsUpdateListeners(state.Endpoints, state.DB.Cluster, state.ServerCert, hbState.Members, state.Events.Inject)
		wg.Done()
	}()

	// Notify all other members of the change in membership.
	logger.Info("Notifying cluster members of local role change")
	for _, member := range members {
		if member.Address == localClusterAddress {
			continue
		}

		wg.Add(1)
		go func(address string) {
			_ = HeartbeatNode(context.Background(), address, state.Endpoints.NetworkCert(), state.ServerCert(), hbState)
			wg.Done()
		}(member.Address)
	}

	// Wait until all members have been notified (or at least have had a change to be notified).
	wg.Wait()
}

// Rebalance the raft cluster, trying to see if we have a spare online node
// that we can promote to voter node if we are below membershipMaxRaftVoters,
// or to standby if we are below membershipMaxStandBys.
//
// If there's such spare node, return its address as well as the new list of
// raft nodes.
func Rebalance(state *state.State, gateway *Gateway, unavailableMembers []string) (string, []db.RaftNode, error) {
	// If we're a standalone node, do nothing.
	if gateway.memoryDial != nil {
		return "", nil, nil
	}

	nodes, err := gateway.currentRaftNodes()
	if err != nil {
		return "", nil, fmt.Errorf("Get current raft nodes: %w", err)
	}

	roles, err := newRolesChanges(state, gateway, nodes, unavailableMembers)
	if err != nil {
		return "", nil, err
	}

	role, candidates := roles.Adjust(gateway.info.ID)

	if role == -1 {
		// No node to promote
		return "", nodes, nil
	}

	// Check if we have a spare node that we can promote to the missing role.
	candidateAddress := candidates[0].Address

	for i, node := range nodes {
		if node.Address == candidateAddress {
			nodes[i].Role = role
			break
		}
	}

	return candidateAddress, nodes, nil
}

// Assign a new role to the local dqlite node.
func Assign(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
	// Figure out our own address.
	address := ""
	err := state.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
		var err error
		address, err = tx.GetLocalNodeAddress(ctx)
		if err != nil {
			return fmt.Errorf("Failed to fetch the address of this cluster member: %w", err)
		}

		return nil
	})
	if err != nil {
		return err
	}

	// Ensure we actually have an address.
	if address == "" {
		return errors.New("Cluster member is not exposed on the network")
	}

	// Figure out our node identity.
	var info *db.RaftNode
	for i, node := range nodes {
		if node.Address == address {
			info = &nodes[i]
		}
	}

	// Ensure that our address was actually included in the given list of raft nodes.
	if info == nil {
		return errors.New("This member is not included in the given list of database nodes")
	}

	// Replace our local list of raft nodes with the given one (which
	// includes ourselves).
	err = state.DB.Node.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error {
		err = tx.ReplaceRaftNodes(nodes)
		if err != nil {
			return fmt.Errorf("Failed to set raft nodes: %w", err)
		}

		return nil
	})
	if err != nil {
		return err
	}

	var transactor func(context.Context, func(ctx context.Context, tx *db.ClusterTx) error) error

	// If we are already running a dqlite node, it means we have cleanly
	// joined the cluster before, using the roles support API. In that case
	// there's no need to restart the gateway and we can just change our
	// dqlite role.
	if gateway.IsDqliteNode() {
		transactor = state.DB.Cluster.Transaction
		goto assign
	}

	// If we get here it means that we are an upgraded node from cluster
	// without roles support, or we didn't cleanly join the cluster. Either
	// way, we don't have a dqlite node running, so we need to restart the
	// gateway.

	// Lock regular access to the cluster database since we don't want any
	// other database code to run while we're reconfiguring raft.
	err = state.DB.Cluster.EnterExclusive()
	if err != nil {
		return fmt.Errorf("Failed to acquire cluster database lock: %w", err)
	}

	transactor = state.DB.Cluster.ExitExclusive

	// Wipe all existing raft data, for good measure (perhaps they were
	// somehow leftover).
	err = os.RemoveAll(state.OS.GlobalDatabaseDir())
	if err != nil {
		return fmt.Errorf("Failed to remove existing raft data: %w", err)
	}

	// Re-initialize the gateway. This will create a new raft factory an
	// dqlite driver instance, which will be exposed over gRPC by the
	// gateway handlers.
	err = gateway.init(false)
	if err != nil {
		return fmt.Errorf("Failed to re-initialize gRPC SQL gateway: %w", err)
	}

assign:
	logger.Info("Changing local database role", logger.Ctx{"role": info.Role})

	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()

	client, err := client.FindLeader(ctx, gateway.NodeStore(), client.WithDialFunc(gateway.raftDial()))
	if err != nil {
		return fmt.Errorf("Connect to cluster leader: %w", err)
	}

	defer func() { _ = client.Close() }()

	// Figure out our current role.
	role := db.RaftRole(-1)
	cluster, err := client.Cluster(ctx)
	if err != nil {
		return fmt.Errorf("Fetch current cluster configuration: %w", err)
	}

	for _, server := range cluster {
		if server.ID == info.ID {
			role = server.Role
			break
		}
	}
	if role == -1 {
		return fmt.Errorf("Node %s does not belong to the current raft configuration", address)
	}

	// If we're stepping back from voter to spare, let's first transition
	// to stand-by first and wait for the configuration change to be
	// notified to us. This prevent us from thinking we're still voters and
	// potentially disrupt the cluster.
	if role == db.RaftVoter && info.Role == db.RaftSpare {
		err = client.Assign(ctx, info.ID, db.RaftStandBy)
		if err != nil {
			return fmt.Errorf("Failed to step back to stand-by: %w", err)
		}

		local, err := gateway.getClient()
		if err != nil {
			return fmt.Errorf("Failed to get local dqlite client: %w", err)
		}

		notified := false
		for range 10 {
			time.Sleep(500 * time.Millisecond)
			servers, err := local.Cluster(context.Background())
			if err != nil {
				return fmt.Errorf("Failed to get current cluster: %w", err)
			}

			for _, server := range servers {
				if server.ID != info.ID {
					continue
				}

				if server.Role == db.RaftStandBy {
					notified = true
					break
				}
			}
			if notified {
				break
			}
		}
		if !notified {
			return errors.New("Timeout waiting for configuration change notification")
		}
	}

	// Give the Assign operation a bit more budget in case we're promoting
	// to voter, since that might require a snapshot transfer.
	if info.Role == db.RaftVoter {
		ctx, cancel = context.WithTimeout(context.Background(), 20*time.Second)
		defer cancel()
	}

	err = client.Assign(ctx, info.ID, info.Role)
	if err != nil {
		return fmt.Errorf("Failed to assign role: %w", err)
	}

	gateway.info = info

	// Unlock regular access to our cluster database.
	err = transactor(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
		return nil
	})
	if err != nil {
		return fmt.Errorf("Cluster database initialization failed: %w", err)
	}

	// Generate partial heartbeat request containing just a raft node list.
	if state.Endpoints != nil {
		NotifyHeartbeat(state, gateway)
	}

	return nil
}

// Leave a cluster.
//
// If the force flag is true, the node will leave even if it still has
// containers and images.
//
// The node will only leave the raft cluster, and won't be removed from the
// database. That's done by Purge().
//
// Upon success, return the address of the leaving node.
//
// This function must be called by the cluster leader.
func Leave(s *state.State, gateway *Gateway, name string, force bool, pending bool) (string, error) {
	logger.Debugf("Make node %s leave the cluster", name)

	// Check if the node can be deleted and track its address.
	var address string
	err := s.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
		// Get the node (if it doesn't exists an error is returned).
		var node db.NodeInfo
		var err error
		if pending {
			node, err = tx.GetPendingNodeByName(ctx, name)
			if err != nil {
				return fmt.Errorf("Failed to get member %q: %w", name, err)
			}
		} else {
			node, err = tx.GetNodeByName(ctx, name)
			if err != nil {
				return fmt.Errorf("Failed to get member %q: %w", name, err)
			}
		}

		// Check that the node is eligeable for leaving.
		if !force {
			err := membershipCheckClusterStateForLeave(ctx, tx, node.ID)
			if err != nil {
				return err
			}
		}

		address = node.Address
		return nil
	})
	if err != nil {
		return "", err
	}

	nodes, err := gateway.currentRaftNodes()
	if err != nil {
		return "", err
	}

	var info *db.RaftNode // Raft node to remove, if any.
	for i, node := range nodes {
		if node.Address == address {
			info = &nodes[i]
			break
		}
	}

	if info == nil {
		// The node was not part of the raft cluster, nothing left to do.
		return address, nil
	}

	// Get the address of another database node,
	logger.Info(
		"Remove node from dqlite raft cluster",
		logger.Ctx{"id": info.ID, "address": info.Address})
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	client, err := gateway.getClient()
	if err != nil {
		return "", fmt.Errorf("Failed to connect to cluster leader: %w", err)
	}

	defer func() { _ = client.Close() }()
	err = client.Remove(ctx, info.ID)
	if err != nil {
		return "", fmt.Errorf("Failed to leave the cluster: %w", err)
	}

	return address, nil
}

// Handover looks for a non-voter member that can be promoted to replace a the
// member with the given address, which is shutting down. It returns the
// address of such member along with an updated list of nodes, with the ne role
// set.
//
// It should be called only by the current leader.
func Handover(state *state.State, gateway *Gateway, address string) (string, []db.RaftNode, error) {
	nodes, err := gateway.currentRaftNodes()
	if err != nil {
		return "", nil, fmt.Errorf("Get current raft nodes: %w", err)
	}

	var nodeID uint64
	for _, node := range nodes {
		if node.Address == address {
			nodeID = node.ID
		}
	}

	if nodeID == 0 {
		return "", nil, fmt.Errorf("No dqlite node has address %s: %w", address, err)
	}

	roles, err := newRolesChanges(state, gateway, nodes, nil)
	if err != nil {
		return "", nil, err
	}

	role, candidates := roles.Handover(nodeID)
	if role == -1 {
		return "", nil, nil
	}

	for i, node := range nodes {
		if node.Address == candidates[0].Address {
			nodes[i].Role = role
			return node.Address, nodes, nil
		}
	}

	return "", nil, nil
}

// Build an app.RolesChanges object fed with the current cluster state.
func newRolesChanges(state *state.State, gateway *Gateway, nodes []db.RaftNode, unavailableMembers []string) (*app.RolesChanges, error) {
	var domains map[string]uint64
	err := state.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
		var err error

		domains, err = tx.GetNodesFailureDomains(ctx)
		if err != nil {
			return fmt.Errorf("Load failure domains: %w", err)
		}

		return nil
	})
	if err != nil {
		return nil, err
	}

	cluster := map[client.NodeInfo]*client.NodeMetadata{}

	for _, node := range nodes {
		if !slices.Contains(unavailableMembers, node.Address) && HasConnectivity(gateway.networkCert, gateway.state().ServerCert(), node.Address, false) {
			cluster[node.NodeInfo] = &client.NodeMetadata{
				FailureDomain: domains[node.Address],
			}
		} else {
			cluster[node.NodeInfo] = nil
		}
	}

	roles := &app.RolesChanges{
		Config: app.RolesConfig{
			Voters:   int(state.GlobalConfig.MaxVoters()),
			StandBys: int(state.GlobalConfig.MaxStandBy()),
		},
		State: cluster,
	}

	return roles, nil
}

// Purge removes a node entirely from the cluster database.
func Purge(c *db.Cluster, name string, pending bool) error {
	logger.Debugf("Remove node %s from the database", name)

	return c.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
		// Get the node (if it doesn't exists an error is returned).
		var node db.NodeInfo
		var err error
		if pending {
			node, err = tx.GetPendingNodeByName(ctx, name)
			if err != nil {
				return fmt.Errorf("Failed to get member %q: %w", name, err)
			}
		} else {
			node, err = tx.GetNodeByName(ctx, name)
			if err != nil {
				return fmt.Errorf("Failed to get member %q: %w", name, err)
			}
		}

		err = tx.ClearNode(ctx, node.ID)
		if err != nil {
			return fmt.Errorf("Failed to clear member %q: %w", name, err)
		}

		err = tx.RemoveNode(node.ID)
		if err != nil {
			return fmt.Errorf("Failed to remove member %q: %w", name, err)
		}

		err = cluster.DeleteCertificates(context.Background(), tx.Tx(), name, certificate.TypeServer)
		if err != nil {
			return fmt.Errorf("Failed to remove member %q certificate from trust store: %w", name, err)
		}

		return nil
	})
}

// Count is a convenience for checking the current number of nodes in the
// cluster.
func Count(state *state.State) (int, error) {
	var count int
	err := state.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
		var err error
		count, err = tx.GetNodesCount(ctx)
		return err
	})

	return count, err
}

// Enabled is a convenience that returns true if clustering is enabled on this
// node.
func Enabled(node *db.Node) (bool, error) {
	enabled := false
	err := node.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error {
		addresses, err := tx.GetRaftNodeAddresses(ctx)
		if err != nil {
			return err
		}

		enabled = len(addresses) > 0
		return nil
	})

	return enabled, err
}

// Check that node-related preconditions are met for bootstrapping or joining a
// cluster.
func membershipCheckNodeStateForBootstrapOrJoin(ctx context.Context, tx *db.NodeTx, address string) error {
	nodes, err := tx.GetRaftNodes(ctx)
	if err != nil {
		return fmt.Errorf("Failed to fetch current raft nodes: %w", err)
	}

	hasClusterAddress := address != ""
	hasRaftNodes := len(nodes) > 0

	// Ensure that we're not in an inconsistent situation, where no cluster address is set, but still there
	// are entries in the raft_nodes table.
	if !hasClusterAddress && hasRaftNodes {
		return errors.New("Inconsistent state: found leftover entries in raft_nodes")
	}

	if !hasClusterAddress {
		return errors.New("No cluster.https_address config is set on this member")
	}

	if hasRaftNodes {
		return errors.New("The member is already part of a cluster")
	}

	return nil
}

// Check that cluster-related preconditions are met for bootstrapping or
// joining a cluster.
func membershipCheckClusterStateForBootstrapOrJoin(ctx context.Context, tx *db.ClusterTx) error {
	members, err := tx.GetNodes(ctx)
	if err != nil {
		return fmt.Errorf("Failed getting cluster members: %w", err)
	}

	if len(members) != 1 {
		return errors.New("Inconsistent state: Found leftover entries in cluster members")
	}

	return nil
}

// Check that cluster-related preconditions are met for accepting a new node.
func membershipCheckClusterStateForAccept(ctx context.Context, tx *db.ClusterTx, name string, address string, schema int, api int) error {
	members, err := tx.GetNodes(ctx)
	if err != nil {
		return fmt.Errorf("Failed getting cluster members: %w", err)
	}

	if len(members) == 1 && members[0].Address == "0.0.0.0" {
		return errors.New("Clustering isn't enabled")
	}

	for _, member := range members {
		if member.Name == name {
			return fmt.Errorf("The cluster already has a member with name: %s", name)
		}

		if member.Address == address {
			return fmt.Errorf("The cluster already has a member with address: %s", address)
		}

		if member.Schema != schema {
			return fmt.Errorf("The joining server version doesn't match (expected %s with DB schema %v)", version.Version, schema)
		}

		if member.APIExtensions != api {
			return fmt.Errorf("The joining server version doesn't match (expected %s with API count %v)", version.Version, api)
		}
	}

	return nil
}

// Check that cluster-related preconditions are met for leaving a cluster.
func membershipCheckClusterStateForLeave(ctx context.Context, tx *db.ClusterTx, nodeID int64) error {
	// Check that it has no containers or images.
	message, err := tx.NodeIsEmpty(ctx, nodeID)
	if err != nil {
		return err
	}

	if message != "" {
		return errors.New(message)
	}

	// Check that it's not the last member.
	members, err := tx.GetNodes(ctx)
	if err != nil {
		return fmt.Errorf("Failed getting cluster members: %w", err)
	}

	if len(members) == 1 {
		return errors.New("Member is the only member in the cluster")
	}

	return nil
}

// Check that there is no left-over cluster certificate in the var dir of this server.
func membershipCheckNoLeftoverClusterCert(dir string) error {
	// Ensure that there's no leftover cluster certificate.
	for _, basename := range []string{"cluster.crt", "cluster.key", "cluster.ca"} {
		if util.PathExists(filepath.Join(dir, basename)) {
			return errors.New("Inconsistent state: found leftover cluster certificate")
		}
	}

	return nil
}

// SchemaVersion holds the version of the cluster database schema.
var SchemaVersion = cluster.SchemaVersion