1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
|
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package raft
import (
"fmt"
"io"
"time"
"github.com/armon/go-metrics"
)
// SnapshotMeta is for metadata of a snapshot.
type SnapshotMeta struct {
// Version is the version number of the snapshot metadata. This does not cover
// the application's data in the snapshot, that should be versioned
// separately.
Version SnapshotVersion
// ID is opaque to the store, and is used for opening.
ID string
// Index and Term store when the snapshot was taken.
Index uint64
Term uint64
// Peers is deprecated and used to support version 0 snapshots, but will
// be populated in version 1 snapshots as well to help with upgrades.
Peers []byte
// Configuration and ConfigurationIndex are present in version 1
// snapshots and later.
Configuration Configuration
ConfigurationIndex uint64
// Size is the size of the snapshot in bytes.
Size int64
}
// SnapshotStore interface is used to allow for flexible implementations
// of snapshot storage and retrieval. For example, a client could implement
// a shared state store such as S3, allowing new nodes to restore snapshots
// without streaming from the leader.
type SnapshotStore interface {
// Create is used to begin a snapshot at a given index and term, and with
// the given committed configuration. The version parameter controls
// which snapshot version to create.
Create(version SnapshotVersion, index, term uint64, configuration Configuration,
configurationIndex uint64, trans Transport) (SnapshotSink, error)
// List is used to list the available snapshots in the store.
// It should return then in descending order, with the highest index first.
List() ([]*SnapshotMeta, error)
// Open takes a snapshot ID and provides a ReadCloser. Once close is
// called it is assumed the snapshot is no longer needed.
Open(id string) (*SnapshotMeta, io.ReadCloser, error)
}
// SnapshotSink is returned by StartSnapshot. The FSM will Write state
// to the sink and call Close on completion. On error, Cancel will be invoked.
type SnapshotSink interface {
io.WriteCloser
ID() string
Cancel() error
}
// runSnapshots is a long running goroutine used to manage taking
// new snapshots of the FSM. It runs in parallel to the FSM and
// main goroutines, so that snapshots do not block normal operation.
func (r *Raft) runSnapshots() {
for {
select {
case <-randomTimeout(r.config().SnapshotInterval):
// Check if we should snapshot
if !r.shouldSnapshot() {
continue
}
// Trigger a snapshot
if _, err := r.takeSnapshot(); err != nil {
r.logger.Error("failed to take snapshot", "error", err)
}
case future := <-r.userSnapshotCh:
// User-triggered, run immediately
id, err := r.takeSnapshot()
if err != nil {
r.logger.Error("failed to take snapshot", "error", err)
} else {
future.opener = func() (*SnapshotMeta, io.ReadCloser, error) {
return r.snapshots.Open(id)
}
}
future.respond(err)
case <-r.shutdownCh:
return
}
}
}
// shouldSnapshot checks if we meet the conditions to take
// a new snapshot.
func (r *Raft) shouldSnapshot() bool {
// Check the last snapshot index
lastSnap, _ := r.getLastSnapshot()
// Check the last log index
lastIdx, err := r.logs.LastIndex()
if err != nil {
r.logger.Error("failed to get last log index", "error", err)
return false
}
// Compare the delta to the threshold
delta := lastIdx - lastSnap
return delta >= r.config().SnapshotThreshold
}
// takeSnapshot is used to take a new snapshot. This must only be called from
// the snapshot thread, never the main thread. This returns the ID of the new
// snapshot, along with an error.
func (r *Raft) takeSnapshot() (string, error) {
defer metrics.MeasureSince([]string{"raft", "snapshot", "takeSnapshot"}, time.Now())
// Create a request for the FSM to perform a snapshot.
snapReq := &reqSnapshotFuture{}
snapReq.init()
// Wait for dispatch or shutdown.
select {
case r.fsmSnapshotCh <- snapReq:
case <-r.shutdownCh:
return "", ErrRaftShutdown
}
// Wait until we get a response
if err := snapReq.Error(); err != nil {
if err != ErrNothingNewToSnapshot {
err = fmt.Errorf("failed to start snapshot: %v", err)
}
return "", err
}
defer snapReq.snapshot.Release()
// Make a request for the configurations and extract the committed info.
// We have to use the future here to safely get this information since
// it is owned by the main thread.
configReq := &configurationsFuture{}
configReq.ShutdownCh = r.shutdownCh
configReq.init()
select {
case r.configurationsCh <- configReq:
case <-r.shutdownCh:
return "", ErrRaftShutdown
}
if err := configReq.Error(); err != nil {
return "", err
}
committed := configReq.configurations.committed
committedIndex := configReq.configurations.committedIndex
// We don't support snapshots while there's a config change outstanding
// since the snapshot doesn't have a means to represent this state. This
// is a little weird because we need the FSM to apply an index that's
// past the configuration change, even though the FSM itself doesn't see
// the configuration changes. It should be ok in practice with normal
// application traffic flowing through the FSM. If there's none of that
// then it's not crucial that we snapshot, since there's not much going
// on Raft-wise.
if snapReq.index < committedIndex {
return "", fmt.Errorf("cannot take snapshot now, wait until the configuration entry at %v has been applied (have applied %v)",
committedIndex, snapReq.index)
}
// Create a new snapshot.
r.logger.Info("starting snapshot up to", "index", snapReq.index)
start := time.Now()
version := getSnapshotVersion(r.protocolVersion)
sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans)
if err != nil {
return "", fmt.Errorf("failed to create snapshot: %v", err)
}
metrics.MeasureSince([]string{"raft", "snapshot", "create"}, start)
// Try to persist the snapshot.
start = time.Now()
if err := snapReq.snapshot.Persist(sink); err != nil {
sink.Cancel()
return "", fmt.Errorf("failed to persist snapshot: %v", err)
}
metrics.MeasureSince([]string{"raft", "snapshot", "persist"}, start)
// Close and check for error.
if err := sink.Close(); err != nil {
return "", fmt.Errorf("failed to close snapshot: %v", err)
}
// Update the last stable snapshot info.
r.setLastSnapshot(snapReq.index, snapReq.term)
// Compact the logs.
if err := r.compactLogs(snapReq.index); err != nil {
return "", err
}
r.logger.Info("snapshot complete up to", "index", snapReq.index)
return sink.ID(), nil
}
// compactLogsWithTrailing takes the last inclusive index of a snapshot,
// the lastLogIdx, and and the trailingLogs and trims the logs that
// are no longer needed.
func (r *Raft) compactLogsWithTrailing(snapIdx uint64, lastLogIdx uint64, trailingLogs uint64) error {
// Determine log ranges to compact
minLog, err := r.logs.FirstIndex()
if err != nil {
return fmt.Errorf("failed to get first log index: %v", err)
}
// Check if we have enough logs to truncate
// Use a consistent value for trailingLogs for the duration of this method
// call to avoid surprising behaviour.
if lastLogIdx <= trailingLogs {
return nil
}
// Truncate up to the end of the snapshot, or `TrailingLogs`
// back from the head, which ever is further back. This ensures
// at least `TrailingLogs` entries, but does not allow logs
// after the snapshot to be removed.
maxLog := min(snapIdx, lastLogIdx-trailingLogs)
if minLog > maxLog {
r.logger.Info("no logs to truncate")
return nil
}
r.logger.Info("compacting logs", "from", minLog, "to", maxLog)
// Compact the logs
if err := r.logs.DeleteRange(minLog, maxLog); err != nil {
return fmt.Errorf("log compaction failed: %v", err)
}
return nil
}
// compactLogs takes the last inclusive index of a snapshot
// and trims the logs that are no longer needed.
func (r *Raft) compactLogs(snapIdx uint64) error {
defer metrics.MeasureSince([]string{"raft", "compactLogs"}, time.Now())
lastLogIdx, _ := r.getLastLog()
trailingLogs := r.config().TrailingLogs
return r.compactLogsWithTrailing(snapIdx, lastLogIdx, trailingLogs)
}
// removeOldLogs removes all old logs from the store. This is used for
// MonotonicLogStores after restore. Callers should verify that the store
// implementation is monotonic prior to calling.
func (r *Raft) removeOldLogs() error {
defer metrics.MeasureSince([]string{"raft", "removeOldLogs"}, time.Now())
lastLogIdx, err := r.logs.LastIndex()
if err != nil {
return fmt.Errorf("failed to get last log index: %w", err)
}
r.logger.Info("removing all old logs from log store")
// call compactLogsWithTrailing with lastLogIdx for snapIdx since
// it will take the lesser of lastLogIdx and snapIdx to figure out
// the end for which to apply trailingLogs.
return r.compactLogsWithTrailing(lastLogIdx, lastLogIdx, 0)
}
|