File: bbolt.go

package info (click to toggle)
golang-github-smallstep-nosql 0.3.8-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 204 kB
  • sloc: makefile: 41
file content (272 lines) | stat: -rw-r--r-- 7,437 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
package bolt

import (
	"bytes"
	"time"

	"github.com/pkg/errors"
	"github.com/smallstep/nosql/database"
	bolt "go.etcd.io/bbolt"
)

var boltDBSep = []byte("/")

// DB is a wrapper over bolt.DB,
type DB struct {
	db *bolt.DB
}

type boltBucket interface {
	Bucket(name []byte) *bolt.Bucket
	CreateBucket(name []byte) (*bolt.Bucket, error)
	CreateBucketIfNotExists(name []byte) (*bolt.Bucket, error)
	DeleteBucket(name []byte) error
}

// Open opens or creates a DB database in the given path.
func (db *DB) Open(dataSourceName string, opt ...database.Option) (err error) {
	opts := &database.Options{}
	for _, o := range opt {
		if err := o(opts); err != nil {
			return err
		}
	}
	db.db, err = bolt.Open(dataSourceName, 0600, &bolt.Options{Timeout: 5 * time.Second})
	return errors.WithStack(err)
}

// Close closes the DB database.
func (db *DB) Close() error {
	return errors.WithStack(db.db.Close())
}

// CreateTable creates a bucket or an embedded bucket if it does not exists.
func (db *DB) CreateTable(bucket []byte) error {
	return db.db.Update(func(tx *bolt.Tx) error {
		return db.createBucket(tx, bucket)
	})
}

// DeleteTable deletes a root or embedded bucket. Returns an error if the
// bucket cannot be found or if the key represents a non-bucket value.
func (db *DB) DeleteTable(bucket []byte) error {
	return db.db.Update(func(tx *bolt.Tx) error {
		return db.deleteBucket(tx, bucket)
	})
}

// Get returns the value stored in the given bucked and key.
func (db *DB) Get(bucket, key []byte) (ret []byte, err error) {
	err = db.db.View(func(tx *bolt.Tx) error {
		b, err := db.getBucket(tx, bucket)
		if err != nil {
			return err
		}
		ret = b.Get(key)
		if ret == nil {
			return database.ErrNotFound
		}
		// Make sure to return a copy as ret is only valid during the
		// transaction.
		ret = cloneBytes(ret)
		return nil
	})
	return
}

// Set stores the given value on bucket and key.
func (db *DB) Set(bucket, key, value []byte) error {
	return db.db.Update(func(tx *bolt.Tx) error {
		b, err := db.getBucket(tx, bucket)
		if err != nil {
			return err
		}
		return errors.WithStack(b.Put(key, value))
	})
}

// Del deletes the value stored in the given bucked and key.
func (db *DB) Del(bucket, key []byte) error {
	return db.db.Update(func(tx *bolt.Tx) error {
		b, err := db.getBucket(tx, bucket)
		if err != nil {
			return err
		}
		return errors.WithStack(b.Delete(key))
	})
}

// List returns the full list of entries in a bucket.
func (db *DB) List(bucket []byte) ([]*database.Entry, error) {
	var entries []*database.Entry
	err := db.db.View(func(tx *bolt.Tx) error {
		b, err := db.getBucket(tx, bucket)
		if err != nil {
			return errors.Wrap(err, "getBucket failed")
		}

		c := b.Cursor()
		for k, v := c.First(); k != nil; k, v = c.Next() {
			entries = append(entries, &database.Entry{
				Bucket: bucket,
				Key:    cloneBytes(k),
				Value:  cloneBytes(v),
			})
		}
		return nil
	})
	return entries, err
}

// CmpAndSwap modifies the value at the given bucket and key (to newValue)
// only if the existing (current) value matches oldValue.
func (db *DB) CmpAndSwap(bucket, key, oldValue, newValue []byte) ([]byte, bool, error) {
	boltTx, err := db.db.Begin(true)
	if err != nil {
		return nil, false, errors.Wrap(err, "error creating Bolt transaction")
	}

	boltBucket := boltTx.Bucket(bucket)
	if boltBucket == nil {
		return nil, false, errors.Errorf("failed to get bucket %s", bucket)
	}

	val, swapped, err := cmpAndSwap(boltBucket, key, oldValue, newValue)
	switch {
	case err != nil:
		if err := boltTx.Rollback(); err != nil {
			return nil, false, errors.Wrapf(err, "failed to execute CmpAndSwap transaction on %s/%s and failed to rollback transaction", bucket, key)
		}
		return nil, false, err
	case swapped:
		if err := boltTx.Commit(); err != nil {
			return nil, false, errors.Wrapf(err, "failed to commit badger transaction")
		}
		return val, swapped, nil
	default:
		if err := boltTx.Rollback(); err != nil {
			return nil, false, errors.Wrapf(err, "failed to rollback read-only CmpAndSwap transaction on %s/%s", bucket, key)
		}
		return val, swapped, err
	}
}

func cmpAndSwap(boltBucket *bolt.Bucket, key, oldValue, newValue []byte) ([]byte, bool, error) {
	current := boltBucket.Get(key)
	if !bytes.Equal(current, oldValue) {
		return cloneBytes(current), false, nil
	}

	if err := boltBucket.Put(key, newValue); err != nil {
		return nil, false, errors.Wrapf(err, "failed to set key %s", key)
	}
	return newValue, true, nil
}

// Update performs multiple commands on one read-write transaction.
func (db *DB) Update(tx *database.Tx) error {
	return db.db.Update(func(boltTx *bolt.Tx) (err error) {
		var b *bolt.Bucket
		for _, q := range tx.Operations {
			// create or delete buckets
			switch q.Cmd {
			case database.CreateTable:
				err = db.createBucket(boltTx, q.Bucket)
				if err != nil {
					return err
				}
				continue
			case database.DeleteTable:
				err = db.deleteBucket(boltTx, q.Bucket)
				if err != nil {
					return err
				}
				continue
			}

			// For other operations, get bucket and perform operation
			b = boltTx.Bucket(q.Bucket)

			switch q.Cmd {
			case database.Get:
				ret := b.Get(q.Key)
				if ret == nil {
					return errors.WithStack(database.ErrNotFound)
				}
				q.Result = cloneBytes(ret)
			case database.Set:
				if err = b.Put(q.Key, q.Value); err != nil {
					return errors.WithStack(err)
				}
			case database.Delete:
				if err = b.Delete(q.Key); err != nil {
					return errors.WithStack(err)
				}
			case database.CmpAndSwap:
				q.Result, q.Swapped, err = cmpAndSwap(b, q.Key, q.CmpValue, q.Value)
				if err != nil {
					return errors.Wrapf(err, "failed to execute CmpAndSwap on %s/%s", q.Bucket, q.Key)
				}
			case database.CmpOrRollback:
				return errors.Errorf("operation '%s' is not yet implemented", q.Cmd)
			default:
				return errors.Errorf("operation '%s' is not supported", q.Cmd)
			}
		}
		return nil
	})
}

// getBucket returns the bucket supporting nested buckets, nested buckets are
// bucket names separated by '/'.
func (db *DB) getBucket(tx *bolt.Tx, name []byte) (b *bolt.Bucket, err error) {
	buckets := bytes.Split(name, boltDBSep)
	for i, n := range buckets {
		if i == 0 {
			b = tx.Bucket(n)
		} else {
			b = b.Bucket(n)
		}
		if b == nil {
			return nil, database.ErrNotFound
		}
	}
	return
}

// createBucket creates a bucket or a nested bucket in the given transaction.
func (db *DB) createBucket(tx *bolt.Tx, name []byte) (err error) {
	b := boltBucket(tx)
	buckets := bytes.Split(name, boltDBSep)
	for _, name := range buckets {
		b, err = b.CreateBucketIfNotExists(name)
		if err != nil {
			return errors.WithStack(err)
		}
	}
	return
}

// deleteBucket deletes a bucket or a nested bucked in the given transaction.
func (db *DB) deleteBucket(tx *bolt.Tx, name []byte) (err error) {
	b := boltBucket(tx)
	buckets := bytes.Split(name, boltDBSep)
	last := len(buckets) - 1
	for i := 0; i < last; i++ {
		if buck := b.Bucket(buckets[i]); buck == nil {
			return errors.Wrapf(database.ErrNotFound, "bucket %s does not exist", bytes.Join(buckets[0:i+1], boltDBSep))
		}
	}
	err = b.DeleteBucket(buckets[last])
	if err == bolt.ErrBucketNotFound {
		return errors.Wrapf(database.ErrNotFound, "bucket %s does not exist", name)
	}
	return
}

// cloneBytes returns a copy of a given slice.
func cloneBytes(v []byte) []byte {
	var clone = make([]byte, len(v))
	copy(clone, v)
	return clone
}