1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
|
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package raft
import (
"bytes"
"fmt"
"io"
"sync"
)
// InmemSnapshotStore implements the SnapshotStore interface and
// retains only the most recent snapshot
type InmemSnapshotStore struct {
latest *InmemSnapshotSink
hasSnapshot bool
sync.RWMutex
}
// InmemSnapshotSink implements SnapshotSink in memory
type InmemSnapshotSink struct {
meta SnapshotMeta
contents *bytes.Buffer
}
// NewInmemSnapshotStore creates a blank new InmemSnapshotStore
func NewInmemSnapshotStore() *InmemSnapshotStore {
return &InmemSnapshotStore{
latest: &InmemSnapshotSink{
contents: &bytes.Buffer{},
},
}
}
// Create replaces the stored snapshot with a new one using the given args
func (m *InmemSnapshotStore) Create(version SnapshotVersion, index, term uint64,
configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) {
// We only support version 1 snapshots at this time.
if version != 1 {
return nil, fmt.Errorf("unsupported snapshot version %d", version)
}
name := snapshotName(term, index)
m.Lock()
defer m.Unlock()
sink := &InmemSnapshotSink{
meta: SnapshotMeta{
Version: version,
ID: name,
Index: index,
Term: term,
Peers: encodePeers(configuration, trans),
Configuration: configuration,
ConfigurationIndex: configurationIndex,
},
contents: &bytes.Buffer{},
}
m.hasSnapshot = true
m.latest = sink
return sink, nil
}
// List returns the latest snapshot taken
func (m *InmemSnapshotStore) List() ([]*SnapshotMeta, error) {
m.RLock()
defer m.RUnlock()
if !m.hasSnapshot {
return []*SnapshotMeta{}, nil
}
return []*SnapshotMeta{&m.latest.meta}, nil
}
// Open wraps an io.ReadCloser around the snapshot contents
func (m *InmemSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) {
m.RLock()
defer m.RUnlock()
if m.latest.meta.ID != id {
return nil, nil, fmt.Errorf("[ERR] snapshot: failed to open snapshot id: %s", id)
}
// Make a copy of the contents, since a bytes.Buffer can only be read
// once.
contents := bytes.NewBuffer(m.latest.contents.Bytes())
return &m.latest.meta, io.NopCloser(contents), nil
}
// Write appends the given bytes to the snapshot contents
func (s *InmemSnapshotSink) Write(p []byte) (n int, err error) {
written, err := s.contents.Write(p)
s.meta.Size += int64(written)
return written, err
}
// Close updates the Size and is otherwise a no-op
func (s *InmemSnapshotSink) Close() error {
return nil
}
// ID returns the ID of the SnapshotMeta
func (s *InmemSnapshotSink) ID() string {
return s.meta.ID
}
// Cancel returns successfully with a nil error
func (s *InmemSnapshotSink) Cancel() error {
return nil
}
|