1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
|
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package raft
import (
"fmt"
"io"
"sync"
"time"
)
// Future is used to represent an action that may occur in the future.
type Future interface {
// Error blocks until the future arrives and then returns the error status
// of the future. This may be called any number of times - all calls will
// return the same value, however is not OK to call this method twice
// concurrently on the same Future instance.
// Error will only return generic errors related to raft, such
// as ErrLeadershipLost, or ErrRaftShutdown. Some operations, such as
// ApplyLog, may also return errors from other methods.
Error() error
}
// IndexFuture is used for future actions that can result in a raft log entry
// being created.
type IndexFuture interface {
Future
// Index holds the index of the newly applied log entry.
// This must not be called until after the Error method has returned.
Index() uint64
}
// ApplyFuture is used for Apply and can return the FSM response.
type ApplyFuture interface {
IndexFuture
// Response returns the FSM response as returned by the FSM.Apply method. This
// must not be called until after the Error method has returned.
// Note that if FSM.Apply returns an error, it will be returned by Response,
// and not by the Error method, so it is always important to check Response
// for errors from the FSM.
Response() interface{}
}
// ConfigurationFuture is used for GetConfiguration and can return the
// latest configuration in use by Raft.
type ConfigurationFuture interface {
IndexFuture
// Configuration contains the latest configuration. This must
// not be called until after the Error method has returned.
Configuration() Configuration
}
// SnapshotFuture is used for waiting on a user-triggered snapshot to complete.
type SnapshotFuture interface {
Future
// Open is a function you can call to access the underlying snapshot and
// its metadata. This must not be called until after the Error method
// has returned.
Open() (*SnapshotMeta, io.ReadCloser, error)
}
// LeadershipTransferFuture is used for waiting on a user-triggered leadership
// transfer to complete.
type LeadershipTransferFuture interface {
Future
}
// errorFuture is used to return a static error.
type errorFuture struct {
err error
}
func (e errorFuture) Error() error {
return e.err
}
func (e errorFuture) Response() interface{} {
return nil
}
func (e errorFuture) Index() uint64 {
return 0
}
// deferError can be embedded to allow a future
// to provide an error in the future.
type deferError struct {
err error
errCh chan error
responded bool
ShutdownCh chan struct{}
}
func (d *deferError) init() {
d.errCh = make(chan error, 1)
}
func (d *deferError) Error() error {
if d.err != nil {
// Note that when we've received a nil error, this
// won't trigger, but the channel is closed after
// send so we'll still return nil below.
return d.err
}
if d.errCh == nil {
panic("waiting for response on nil channel")
}
select {
case d.err = <-d.errCh:
case <-d.ShutdownCh:
d.err = ErrRaftShutdown
}
return d.err
}
func (d *deferError) respond(err error) {
if d.errCh == nil {
return
}
if d.responded {
return
}
d.errCh <- err
close(d.errCh)
d.responded = true
}
// There are several types of requests that cause a configuration entry to
// be appended to the log. These are encoded here for leaderLoop() to process.
// This is internal to a single server.
type configurationChangeFuture struct {
logFuture
req configurationChangeRequest
}
// bootstrapFuture is used to attempt a live bootstrap of the cluster. See the
// Raft object's BootstrapCluster member function for more details.
type bootstrapFuture struct {
deferError
// configuration is the proposed bootstrap configuration to apply.
configuration Configuration
}
// logFuture is used to apply a log entry and waits until
// the log is considered committed.
type logFuture struct {
deferError
log Log
response interface{}
dispatch time.Time
}
func (l *logFuture) Response() interface{} {
return l.response
}
func (l *logFuture) Index() uint64 {
return l.log.Index
}
type shutdownFuture struct {
raft *Raft
}
func (s *shutdownFuture) Error() error {
if s.raft == nil {
return nil
}
s.raft.waitShutdown()
if closeable, ok := s.raft.trans.(WithClose); ok {
closeable.Close()
}
return nil
}
// userSnapshotFuture is used for waiting on a user-triggered snapshot to
// complete.
type userSnapshotFuture struct {
deferError
// opener is a function used to open the snapshot. This is filled in
// once the future returns with no error.
opener func() (*SnapshotMeta, io.ReadCloser, error)
}
// Open is a function you can call to access the underlying snapshot and its
// metadata.
func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) {
if u.opener == nil {
return nil, nil, fmt.Errorf("no snapshot available")
}
// Invalidate the opener so it can't get called multiple times,
// which isn't generally safe.
defer func() {
u.opener = nil
}()
return u.opener()
}
// userRestoreFuture is used for waiting on a user-triggered restore of an
// external snapshot to complete.
type userRestoreFuture struct {
deferError
// meta is the metadata that belongs with the snapshot.
meta *SnapshotMeta
// reader is the interface to read the snapshot contents from.
reader io.Reader
}
// reqSnapshotFuture is used for requesting a snapshot start.
// It is only used internally.
type reqSnapshotFuture struct {
deferError
// snapshot details provided by the FSM runner before responding
index uint64
term uint64
snapshot FSMSnapshot
}
// restoreFuture is used for requesting an FSM to perform a
// snapshot restore. Used internally only.
type restoreFuture struct {
deferError
ID string
}
// verifyFuture is used to verify the current node is still
// the leader. This is to prevent a stale read.
type verifyFuture struct {
deferError
notifyCh chan *verifyFuture
quorumSize int
votes int
voteLock sync.Mutex
}
// leadershipTransferFuture is used to track the progress of a leadership
// transfer internally.
type leadershipTransferFuture struct {
deferError
ID *ServerID
Address *ServerAddress
}
// configurationsFuture is used to retrieve the current configurations. This is
// used to allow safe access to this information outside of the main thread.
type configurationsFuture struct {
deferError
configurations configurations
}
// Configuration returns the latest configuration in use by Raft.
func (c *configurationsFuture) Configuration() Configuration {
return c.configurations.latest
}
// Index returns the index of the latest configuration in use by Raft.
func (c *configurationsFuture) Index() uint64 {
return c.configurations.latestIndex
}
// vote is used to respond to a verifyFuture.
// This may block when responding on the notifyCh.
func (v *verifyFuture) vote(leader bool) {
v.voteLock.Lock()
defer v.voteLock.Unlock()
// Guard against having notified already
if v.notifyCh == nil {
return
}
if leader {
v.votes++
if v.votes >= v.quorumSize {
v.notifyCh <- v
v.notifyCh = nil
}
} else {
v.notifyCh <- v
v.notifyCh = nil
}
}
// appendFuture is used for waiting on a pipelined append
// entries RPC.
type appendFuture struct {
deferError
start time.Time
args *AppendEntriesRequest
resp *AppendEntriesResponse
}
func (a *appendFuture) Start() time.Time {
return a.start
}
func (a *appendFuture) Request() *AppendEntriesRequest {
return a.args
}
func (a *appendFuture) Response() *AppendEntriesResponse {
return a.resp
}
|