File: options.go

package info (click to toggle)
golang-github-canonical-go-dqlite 2.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 712 kB
  • sloc: sh: 380; makefile: 5
file content (334 lines) | stat: -rw-r--r-- 10,021 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
package app

import (
	"crypto/tls"
	"fmt"
	"log"
	"net"
	"strings"
	"time"

	"github.com/canonical/go-dqlite/v2"
	"github.com/canonical/go-dqlite/v2/client"
	"github.com/canonical/go-dqlite/v2/internal/protocol"
)

// Option can be used to tweak app parameters.
type Option func(*options)

// WithAddress sets the network address of the application node.
//
// Other application nodes must be able to connect to this application node
// using the given address.
//
// If the application node is not the first one in the cluster, the address
// must match the value that was passed to the App.Add() method upon
// registration.
//
// If not given the first non-loopback IP address of any of the system network
// interfaces will be used, with port 9000.
//
// The address must be stable across application restarts.
func WithAddress(address string) Option {
	return func(options *options) {
		options.Address = address
	}
}

// WithCluster must be used when starting a newly added application node for
// the first time.
//
// It should contain the addresses of one or more applications nodes which are
// already part of the cluster.
func WithCluster(cluster []string) Option {
	return func(options *options) {
		options.Cluster = cluster
	}
}

// WithExternalConn enables passing an external dial function that will be used
// whenever dqlite needs to make an outside connection.
//
// Also takes a net.Conn channel that should be received when the external connection has been accepted.
func WithExternalConn(dialFunc client.DialFunc, acceptCh chan net.Conn) Option {
	return func(options *options) {
		options.Conn = &connSetup{
			dialFunc: dialFunc,
			acceptCh: acceptCh,
		}
	}
}

// WithTLS enables TLS encryption of network traffic.
//
// The "listen" parameter must hold the TLS configuration to use when accepting
// incoming connections clients or application nodes.
//
// The "dial" parameter must hold the TLS configuration to use when
// establishing outgoing connections to other application nodes.
func WithTLS(listen *tls.Config, dial *tls.Config) Option {
	return func(options *options) {
		options.TLS = &tlsSetup{
			Listen: listen,
			Dial:   dial,
		}
	}
}

// WithUnixSocket allows setting a specific socket path for communication between go-dqlite and dqlite.
//
// The default is an empty string which means a random abstract unix socket.
func WithUnixSocket(path string) Option {
	return func(options *options) {
		options.UnixSocket = path
	}
}

// WithVoters sets the number of nodes in the cluster that should have the
// Voter role.
//
// When a new node is added to the cluster or it is started again after a
// shutdown it will be assigned the Voter role in case the current number of
// voters is below n.
//
// Similarly when a node with the Voter role is shutdown gracefully by calling
// the Handover() method, it will try to transfer its Voter role to another
// non-Voter node, if one is available.
//
// All App instances in a cluster must be created with the same WithVoters
// setting.
//
// The given value must be an odd number greater than one.
//
// The default value is 3.
func WithVoters(n int) Option {
	return func(options *options) {
		options.Voters = n
	}
}

// WithStandBys sets the number of nodes in the cluster that should have the
// StandBy role.
//
// When a new node is added to the cluster or it is started again after a
// shutdown it will be assigned the StandBy role in case there are already
// enough online voters, but the current number of stand-bys is below n.
//
// Similarly when a node with the StandBy role is shutdown gracefully by
// calling the Handover() method, it will try to transfer its StandBy role to
// another non-StandBy node, if one is available.
//
// All App instances in a cluster must be created with the same WithStandBys
// setting.
//
// The default value is 3.
func WithStandBys(n int) Option {
	return func(options *options) {
		options.StandBys = n
	}
}

// WithRolesAdjustmentFrequency sets the frequency at which the current cluster
// leader will check if the roles of the various nodes in the cluster matches
// the desired setup and perform promotions/demotions to adjust the situation
// if needed.
//
// The default is 30 seconds.
func WithRolesAdjustmentFrequency(frequency time.Duration) Option {
	return func(options *options) {
		options.RolesAdjustmentFrequency = frequency
	}
}

// WithRolesAdjustmentHook will be run each time the roles are adjusted, as
// controlled by WithRolesAdjustmentFrequency. Provides the current raft leader information
// as well as the most up to date list of cluster members and their roles.
func WithRolesAdjustmentHook(hook func(leader client.NodeInfo, cluster []client.NodeInfo) error) Option {
	return func(o *options) {
		o.OnRolesAdjustment = hook
	}
}

// WithLogFunc sets a custom log function.
func WithLogFunc(log client.LogFunc) Option {
	return func(options *options) {
		options.Log = log
	}
}

// WithTracing will emit a log message at the given level every time a
// statement gets executed.
func WithTracing(level client.LogLevel) Option {
	return func(options *options) {
		options.Tracing = level
	}
}

// WithFailureDomain sets the node's failure domain.
//
// Failure domains are taken into account when deciding which nodes to promote
// to Voter or StandBy when needed.
func WithFailureDomain(code uint64) Option {
	return func(options *options) {
		options.FailureDomain = code
	}
}

// WithNetworkLatency sets the average one-way network latency.
func WithNetworkLatency(latency time.Duration) Option {
	return func(options *options) {
		options.NetworkLatency = latency
	}
}

// WithConcurrentLeaderConns is the maximum number of concurrent connections
// to other cluster members that will be attempted while searching for the dqlite leader.
// It takes a pointer to an integer so that the value can be dynamically modified based on cluster health.
//
// The default is 10 connections to other cluster members.
func WithConcurrentLeaderConns(maxConns *int64) Option {
	return func(o *options) {
		o.ConcurrentLeaderConns = maxConns
	}
}

// WithSnapshotParams sets the raft snapshot parameters.
func WithSnapshotParams(params dqlite.SnapshotParams) Option {
	return func(options *options) {
		options.SnapshotParams = params
	}
}

// WithDiskMode enables or disables disk-mode.
// WARNING: This is experimental API, use with caution
// and prepare for data loss.
// UNSTABLE: Behavior can change in future.
// NOT RECOMMENDED for production use-cases, use at own risk.
func WithDiskMode(disk bool) Option {
	return func(options *options) {
		options.DiskMode = disk
	}
}

// WithAutoRecovery enables or disables auto-recovery of persisted data
// at startup for this node.
//
// When auto-recovery is enabled, raft snapshots and segment files may be
// deleted at startup if they are determined to be corrupt. This helps
// the startup process to succeed in more cases, but can lead to data loss.
//
// Auto-recovery is enabled by default.
func WithAutoRecovery(recovery bool) Option {
	return func(options *options) {
		options.AutoRecovery = recovery
	}
}

type tlsSetup struct {
	Listen *tls.Config
	Dial   *tls.Config
}

type connSetup struct {
	dialFunc client.DialFunc
	acceptCh chan net.Conn
}

type options struct {
	Address                  string
	Cluster                  []string
	Log                      client.LogFunc
	Tracing                  client.LogLevel
	TLS                      *tlsSetup
	Conn                     *connSetup
	Voters                   int
	StandBys                 int
	RolesAdjustmentFrequency time.Duration
	OnRolesAdjustment        func(client.NodeInfo, []client.NodeInfo) error
	FailureDomain            uint64
	NetworkLatency           time.Duration
	ConcurrentLeaderConns    *int64
	UnixSocket               string
	SnapshotParams           dqlite.SnapshotParams
	DiskMode                 bool
	AutoRecovery             bool
}

// Create a options object with sane defaults.
func defaultOptions() *options {
	maxConns := protocol.MaxConcurrentLeaderConns
	return &options{
		Log:                      defaultLogFunc,
		Tracing:                  client.LogNone,
		Voters:                   3,
		StandBys:                 3,
		RolesAdjustmentFrequency: 30 * time.Second,
		OnRolesAdjustment:        func(client.NodeInfo, []client.NodeInfo) error { return nil },
		DiskMode:                 false, // Be explicit about not enabling disk-mode by default.
		AutoRecovery:             true,
		ConcurrentLeaderConns:    &maxConns,
	}
}

func isLoopback(iface *net.Interface) bool {
	return int(iface.Flags&net.FlagLoopback) > 0
}

// see https://stackoverflow.com/a/48519490/3613657
// Valid IPv4 notations:
//
//    "192.168.0.1": basic
//    "192.168.0.1:80": with port info
//
// Valid IPv6 notations:
//
//    "::FFFF:C0A8:1": basic
//    "::FFFF:C0A8:0001": leading zeros
//    "0000:0000:0000:0000:0000:FFFF:C0A8:1": double colon expanded
//    "::FFFF:C0A8:1%1": with zone info
//    "::FFFF:192.168.0.1": IPv4 literal
//    "[::FFFF:C0A8:1]:80": with port info
//    "[::FFFF:C0A8:1%1]:80": with zone and port info
func isIpV4(ip string) bool {
	return strings.Count(ip, ":") < 2
}

func defaultAddress() (addr string, err error) {
	ifaces, err := net.Interfaces()
	if err != nil {
		return "", err
	}
	for _, iface := range ifaces {
		if isLoopback(&iface) {
			continue
		}
		addrs, err := iface.Addrs()
		if err != nil {
			continue
		}
		if len(addrs) == 0 {
			continue
		}
		addr, ok := addrs[0].(*net.IPNet)
		if !ok {
			continue
		}
		ipStr := addr.IP.String()
		if isIpV4(ipStr) {
			return addr.IP.String() + ":9000", nil
		} else {
			return "[" + addr.IP.String() + "]" + ":9000", nil
		}
	}

	return "", fmt.Errorf("no suitable net.Interface found: %v", err)
}

func defaultLogFunc(l client.LogLevel, format string, a ...interface{}) {
	// Log only error messages
	if l != client.LogError {
		return
	}
	msg := fmt.Sprintf("["+l.String()+"]"+" dqlite: "+format, a...)
	log.Printf("%s", msg)
}