1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
|
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package raft
import (
"fmt"
"io"
"os"
"time"
"github.com/hashicorp/go-hclog"
)
// ProtocolVersion is the version of the protocol (which includes RPC messages
// as well as Raft-specific log entries) that this server can _understand_. Use
// the ProtocolVersion member of the Config object to control the version of
// the protocol to use when _speaking_ to other servers. Note that depending on
// the protocol version being spoken, some otherwise understood RPC messages
// may be refused. See dispositionRPC for details of this logic.
//
// There are notes about the upgrade path in the description of the versions
// below. If you are starting a fresh cluster then there's no reason not to
// jump right to the latest protocol version. If you need to interoperate with
// older, version 0 Raft servers you'll need to drive the cluster through the
// different versions in order.
//
// The version details are complicated, but here's a summary of what's required
// to get from a version 0 cluster to version 3:
//
// 1. In version N of your app that starts using the new Raft library with
// versioning, set ProtocolVersion to 1.
// 2. Make version N+1 of your app require version N as a prerequisite (all
// servers must be upgraded). For version N+1 of your app set ProtocolVersion
// to 2.
// 3. Similarly, make version N+2 of your app require version N+1 as a
// prerequisite. For version N+2 of your app, set ProtocolVersion to 3.
//
// During this upgrade, older cluster members will still have Server IDs equal
// to their network addresses. To upgrade an older member and give it an ID, it
// needs to leave the cluster and re-enter:
//
// 1. Remove the server from the cluster with RemoveServer, using its network
// address as its ServerID.
// 2. Update the server's config to use a UUID or something else that is
// not tied to the machine as the ServerID (restarting the server).
// 3. Add the server back to the cluster with AddVoter, using its new ID.
//
// You can do this during the rolling upgrade from N+1 to N+2 of your app, or
// as a rolling change at any time after the upgrade.
//
// # Version History
//
// 0: Original Raft library before versioning was added. Servers running this
//
// version of the Raft library use AddPeerDeprecated/RemovePeerDeprecated
// for all configuration changes, and have no support for LogConfiguration.
//
// 1: First versioned protocol, used to interoperate with old servers, and begin
//
// the migration path to newer versions of the protocol. Under this version
// all configuration changes are propagated using the now-deprecated
// RemovePeerDeprecated Raft log entry. This means that server IDs are always
// set to be the same as the server addresses (since the old log entry type
// cannot transmit an ID), and only AddPeer/RemovePeer APIs are supported.
// Servers running this version of the protocol can understand the new
// LogConfiguration Raft log entry but will never generate one so they can
// remain compatible with version 0 Raft servers in the cluster.
//
// 2: Transitional protocol used when migrating an existing cluster to the new
//
// server ID system. Server IDs are still set to be the same as server
// addresses, but all configuration changes are propagated using the new
// LogConfiguration Raft log entry type, which can carry full ID information.
// This version supports the old AddPeer/RemovePeer APIs as well as the new
// ID-based AddVoter/RemoveServer APIs which should be used when adding
// version 3 servers to the cluster later. This version sheds all
// interoperability with version 0 servers, but can interoperate with newer
// Raft servers running with protocol version 1 since they can understand the
// new LogConfiguration Raft log entry, and this version can still understand
// their RemovePeerDeprecated Raft log entries. We need this protocol version
// as an intermediate step between 1 and 3 so that servers will propagate the
// ID information that will come from newly-added (or -rolled) servers using
// protocol version 3, but since they are still using their address-based IDs
// from the previous step they will still be able to track commitments and
// their own voting status properly. If we skipped this step, servers would
// be started with their new IDs, but they wouldn't see themselves in the old
// address-based configuration, so none of the servers would think they had a
// vote.
//
// 3: Protocol adding full support for server IDs and new ID-based server APIs
//
// (AddVoter, AddNonvoter, etc.), old AddPeer/RemovePeer APIs are no longer
// supported. Version 2 servers should be swapped out by removing them from
// the cluster one-by-one and re-adding them with updated configuration for
// this protocol version, along with their server ID. The remove/add cycle
// is required to populate their server ID. Note that removing must be done
// by ID, which will be the old server's address.
type ProtocolVersion int
const (
// ProtocolVersionMin is the minimum protocol version
ProtocolVersionMin ProtocolVersion = 0
// ProtocolVersionMax is the maximum protocol version
ProtocolVersionMax = 3
)
// SnapshotVersion is the version of snapshots that this server can understand.
// Currently, it is always assumed that the server generates the latest version,
// though this may be changed in the future to include a configurable version.
//
// # Version History
//
// 0: Original Raft library before versioning was added. The peers portion of
//
// these snapshots is encoded in the legacy format which requires decodePeers
// to parse. This version of snapshots should only be produced by the
// unversioned Raft library.
//
// 1: New format which adds support for a full configuration structure and its
//
// associated log index, with support for server IDs and non-voting server
// modes. To ease upgrades, this also includes the legacy peers structure but
// that will never be used by servers that understand version 1 snapshots.
// Since the original Raft library didn't enforce any versioning, we must
// include the legacy peers structure for this version, but we can deprecate
// it in the next snapshot version.
type SnapshotVersion int
const (
// SnapshotVersionMin is the minimum snapshot version
SnapshotVersionMin SnapshotVersion = 0
// SnapshotVersionMax is the maximum snapshot version
SnapshotVersionMax = 1
)
// Config provides any necessary configuration for the Raft server.
type Config struct {
// ProtocolVersion allows a Raft server to inter-operate with older
// Raft servers running an older version of the code. This is used to
// version the wire protocol as well as Raft-specific log entries that
// the server uses when _speaking_ to other servers. There is currently
// no auto-negotiation of versions so all servers must be manually
// configured with compatible versions. See ProtocolVersionMin and
// ProtocolVersionMax for the versions of the protocol that this server
// can _understand_.
ProtocolVersion ProtocolVersion
// HeartbeatTimeout specifies the time in follower state without contact
// from a leader before we attempt an election.
HeartbeatTimeout time.Duration
// ElectionTimeout specifies the time in candidate state without contact
// from a leader before we attempt an election.
ElectionTimeout time.Duration
// CommitTimeout specifies the time without an Apply operation before the
// leader sends an AppendEntry RPC to followers, to ensure a timely commit of
// log entries.
// Due to random staggering, may be delayed as much as 2x this value.
CommitTimeout time.Duration
// MaxAppendEntries controls the maximum number of append entries
// to send at once. We want to strike a balance between efficiency
// and avoiding waste if the follower is going to reject because of
// an inconsistent log.
MaxAppendEntries int
// BatchApplyCh indicates whether we should buffer applyCh
// to size MaxAppendEntries. This enables batch log commitment,
// but breaks the timeout guarantee on Apply. Specifically,
// a log can be added to the applyCh buffer but not actually be
// processed until after the specified timeout.
BatchApplyCh bool
// If we are a member of a cluster, and RemovePeer is invoked for the
// local node, then we forget all peers and transition into the follower state.
// If ShutdownOnRemove is set, we additional shutdown Raft. Otherwise,
// we can become a leader of a cluster containing only this node.
ShutdownOnRemove bool
// TrailingLogs controls how many logs we leave after a snapshot. This is used
// so that we can quickly replay logs on a follower instead of being forced to
// send an entire snapshot. The value passed here is the initial setting used.
// This can be tuned during operation using ReloadConfig.
TrailingLogs uint64
// SnapshotInterval controls how often we check if we should perform a
// snapshot. We randomly stagger between this value and 2x this value to avoid
// the entire cluster from performing a snapshot at once. The value passed
// here is the initial setting used. This can be tuned during operation using
// ReloadConfig.
SnapshotInterval time.Duration
// SnapshotThreshold controls how many outstanding logs there must be before
// we perform a snapshot. This is to prevent excessive snapshotting by
// replaying a small set of logs instead. The value passed here is the initial
// setting used. This can be tuned during operation using ReloadConfig.
SnapshotThreshold uint64
// LeaderLeaseTimeout is used to control how long the "lease" lasts
// for being the leader without being able to contact a quorum
// of nodes. If we reach this interval without contact, we will
// step down as leader.
LeaderLeaseTimeout time.Duration
// LocalID is a unique ID for this server across all time. When running with
// ProtocolVersion < 3, you must set this to be the same as the network
// address of your transport.
LocalID ServerID
// NotifyCh is used to provide a channel that will be notified of leadership
// changes. Raft will block writing to this channel, so it should either be
// buffered or aggressively consumed.
NotifyCh chan<- bool
// LogOutput is used as a sink for logs, unless Logger is specified.
// Defaults to os.Stderr.
LogOutput io.Writer
// LogLevel represents a log level. If the value does not match a known
// logging level hclog.NoLevel is used.
LogLevel string
// Logger is a user-provided logger. If nil, a logger writing to
// LogOutput with LogLevel is used.
Logger hclog.Logger
// NoSnapshotRestoreOnStart controls if raft will restore a snapshot to the
// FSM on start. This is useful if your FSM recovers from other mechanisms
// than raft snapshotting. Snapshot metadata will still be used to initialize
// raft's configuration and index values.
NoSnapshotRestoreOnStart bool
// skipStartup allows NewRaft() to bypass all background work goroutines
skipStartup bool
}
func (conf *Config) getOrCreateLogger() hclog.Logger {
if conf.Logger != nil {
return conf.Logger
}
if conf.LogOutput == nil {
conf.LogOutput = os.Stderr
}
return hclog.New(&hclog.LoggerOptions{
Name: "raft",
Level: hclog.LevelFromString(conf.LogLevel),
Output: conf.LogOutput,
})
}
// ReloadableConfig is the subset of Config that may be reconfigured during
// runtime using raft.ReloadConfig. We choose to duplicate fields over embedding
// or accepting a Config but only using specific fields to keep the API clear.
// Reconfiguring some fields is potentially dangerous so we should only
// selectively enable it for fields where that is allowed.
type ReloadableConfig struct {
// TrailingLogs controls how many logs we leave after a snapshot. This is used
// so that we can quickly replay logs on a follower instead of being forced to
// send an entire snapshot. The value passed here updates the setting at runtime
// which will take effect as soon as the next snapshot completes and truncation
// occurs.
TrailingLogs uint64
// SnapshotInterval controls how often we check if we should perform a snapshot.
// We randomly stagger between this value and 2x this value to avoid the entire
// cluster from performing a snapshot at once.
SnapshotInterval time.Duration
// SnapshotThreshold controls how many outstanding logs there must be before
// we perform a snapshot. This is to prevent excessive snapshots when we can
// just replay a small set of logs.
SnapshotThreshold uint64
// HeartbeatTimeout specifies the time in follower state without
// a leader before we attempt an election.
HeartbeatTimeout time.Duration
// ElectionTimeout specifies the time in candidate state without
// a leader before we attempt an election.
ElectionTimeout time.Duration
}
// apply sets the reloadable fields on the passed Config to the values in
// `ReloadableConfig`. It returns a copy of Config with the fields from this
// ReloadableConfig set.
func (rc *ReloadableConfig) apply(to Config) Config {
to.TrailingLogs = rc.TrailingLogs
to.SnapshotInterval = rc.SnapshotInterval
to.SnapshotThreshold = rc.SnapshotThreshold
to.HeartbeatTimeout = rc.HeartbeatTimeout
to.ElectionTimeout = rc.ElectionTimeout
return to
}
// fromConfig copies the reloadable fields from the passed Config.
func (rc *ReloadableConfig) fromConfig(from Config) {
rc.TrailingLogs = from.TrailingLogs
rc.SnapshotInterval = from.SnapshotInterval
rc.SnapshotThreshold = from.SnapshotThreshold
rc.HeartbeatTimeout = from.HeartbeatTimeout
rc.ElectionTimeout = from.ElectionTimeout
}
// DefaultConfig returns a Config with usable defaults.
func DefaultConfig() *Config {
return &Config{
ProtocolVersion: ProtocolVersionMax,
HeartbeatTimeout: 1000 * time.Millisecond,
ElectionTimeout: 1000 * time.Millisecond,
CommitTimeout: 50 * time.Millisecond,
MaxAppendEntries: 64,
ShutdownOnRemove: true,
TrailingLogs: 10240,
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 8192,
LeaderLeaseTimeout: 500 * time.Millisecond,
LogLevel: "DEBUG",
}
}
// ValidateConfig is used to validate a sane configuration
func ValidateConfig(config *Config) error {
// We don't actually support running as 0 in the library any more, but
// we do understand it.
protocolMin := ProtocolVersionMin
if protocolMin == 0 {
protocolMin = 1
}
if config.ProtocolVersion < protocolMin ||
config.ProtocolVersion > ProtocolVersionMax {
return fmt.Errorf("ProtocolVersion %d must be >= %d and <= %d",
config.ProtocolVersion, protocolMin, ProtocolVersionMax)
}
if len(config.LocalID) == 0 {
return fmt.Errorf("LocalID cannot be empty")
}
if config.HeartbeatTimeout < 5*time.Millisecond {
return fmt.Errorf("HeartbeatTimeout is too low")
}
if config.ElectionTimeout < 5*time.Millisecond {
return fmt.Errorf("ElectionTimeout is too low")
}
if config.CommitTimeout < time.Millisecond {
return fmt.Errorf("CommitTimeout is too low")
}
if config.MaxAppendEntries <= 0 {
return fmt.Errorf("MaxAppendEntries must be positive")
}
if config.MaxAppendEntries > 1024 {
return fmt.Errorf("MaxAppendEntries is too large")
}
if config.SnapshotInterval < 5*time.Millisecond {
return fmt.Errorf("SnapshotInterval is too low")
}
if config.LeaderLeaseTimeout < 5*time.Millisecond {
return fmt.Errorf("LeaderLeaseTimeout is too low")
}
if config.LeaderLeaseTimeout > config.HeartbeatTimeout {
return fmt.Errorf("LeaderLeaseTimeout (%s) cannot be larger than heartbeat timeout (%s)", config.LeaderLeaseTimeout, config.HeartbeatTimeout)
}
if config.ElectionTimeout < config.HeartbeatTimeout {
return fmt.Errorf("ElectionTimeout (%s) must be equal or greater than Heartbeat Timeout (%s)", config.ElectionTimeout, config.HeartbeatTimeout)
}
return nil
}
|