File: notify.go

package info (click to toggle)
incus 6.0.5-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 24,428 kB
  • sloc: sh: 16,313; ansic: 3,121; python: 457; makefile: 337; ruby: 51; sql: 50; lisp: 6
file content (124 lines) | stat: -rw-r--r-- 3,499 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package cluster

import (
	"context"
	"fmt"
	"sync"
	"time"

	incus "github.com/lxc/incus/v6/client"
	"github.com/lxc/incus/v6/internal/server/db"
	"github.com/lxc/incus/v6/internal/server/state"
	"github.com/lxc/incus/v6/shared/logger"
	localtls "github.com/lxc/incus/v6/shared/tls"
)

// Notifier is a function that invokes the given function against each node in
// the cluster excluding the invoking one.
type Notifier func(hook func(incus.InstanceServer) error) error

// NotifierPolicy can be used to tweak the behavior of NewNotifier in case of
// some nodes are down.
type NotifierPolicy int

// Possible notification policies.
const (
	NotifyAll    NotifierPolicy = iota // Requires that all nodes are up.
	NotifyAlive                        // Only notifies nodes that are alive
	NotifyTryAll                       // Attempt to notify all nodes regardless of state.
)

// NewNotifier builds a Notifier that can be used to notify other peers using
// the given policy.
func NewNotifier(state *state.State, networkCert *localtls.CertInfo, serverCert *localtls.CertInfo, policy NotifierPolicy) (Notifier, error) {
	localClusterAddress := state.LocalConfig.ClusterAddress()

	// Fast-track the case where we're not clustered at all.
	if localClusterAddress == "" {
		nullNotifier := func(func(incus.InstanceServer) error) error { return nil }
		return nullNotifier, nil
	}

	var err error
	var members []db.NodeInfo
	var offlineThreshold time.Duration
	err = state.DB.Cluster.Transaction(context.TODO(), func(ctx context.Context, tx *db.ClusterTx) error {
		offlineThreshold, err = tx.GetNodeOfflineThreshold(ctx)
		if err != nil {
			return err
		}

		members, err = tx.GetNodes(ctx)
		if err != nil {
			return fmt.Errorf("Failed getting cluster members: %w", err)
		}

		return nil
	})
	if err != nil {
		return nil, err
	}

	peers := []string{}
	for _, member := range members {
		if member.Address == localClusterAddress || member.Address == "0.0.0.0" {
			continue // Exclude ourselves
		}

		if member.IsOffline(offlineThreshold) {
			// Even if the heartbeat timestamp is not recent
			// enough, let's try to connect to the node, just in
			// case the heartbeat is lagging behind for some reason
			// and the node is actually up.
			if !HasConnectivity(networkCert, serverCert, member.Address, true) {
				switch policy {
				case NotifyAll:
					return nil, fmt.Errorf("peer node %s is down", member.Address)
				case NotifyAlive:
					continue // Just skip this node
				case NotifyTryAll:
				}
			}
		}

		peers = append(peers, member.Address)
	}

	notifier := func(hook func(incus.InstanceServer) error) error {
		errs := make([]error, len(peers))
		wg := sync.WaitGroup{}
		wg.Add(len(peers))
		for i, address := range peers {
			logger.Debugf("Notify node %s of state changes", address)
			go func(i int, address string) {
				defer wg.Done()
				client, err := Connect(address, networkCert, serverCert, nil, true)
				if err != nil {
					errs[i] = fmt.Errorf("failed to connect to peer %s: %w", address, err)
					return
				}

				err = hook(client)
				if err != nil {
					errs[i] = fmt.Errorf("failed to notify peer %s: %w", address, err)
				}
			}(i, address)
		}

		wg.Wait()
		// TODO: aggregate all errors?
		for i, err := range errs {
			if err != nil {
				if localtls.IsConnectionError(err) && policy == NotifyAlive {
					logger.Warnf("Could not notify node %s", peers[i])
					continue
				}

				return err
			}
		}
		return nil
	}

	return notifier, nil
}