File: persist.go

package info (click to toggle)
golang-github-juju-utils 0.0~git20200923.4646bfe-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 1,324 kB
  • sloc: makefile: 37
file content (231 lines) | stat: -rw-r--r-- 6,268 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
// Copyright 2017 Canonical Ltd.
// Licensed under the LGPLv3, see LICENCE file for details.

// Package mgokv defines cached MongoDB-backed global persistent storage for
// key-value pairs.
//
// It is designed to be used when there is a small set of attributes that change infrequently.
// It shouldn't be used when there's an unbounded set of keys, as key
// entries are not deleted.
package mgokv

import (
	"sync"
	"time"

	"gopkg.in/errgo.v1"
	mgo "gopkg.in/mgo.v2"
	"gopkg.in/mgo.v2/bson"
)

// ErrNotFound is returned as the cause of the error
// when an entry is not found.
var ErrNotFound = errgo.New("persistent data entry not found")

type entry struct {
	// value holds the BSON-marshaled value. It will be empty
	// if the entry was not found when fetched.
	value []byte
	// expire holds when the value will expire
	// from the cache. This will be zero when the
	// value does not exist.
	expire time.Time
}

// Store represents a cached set of key-value pairs.
type Store struct {
	cacheLifetime time.Duration
	mu            sync.RWMutex
	// entries holds all the cached entries.
	entries map[string]entry
	coll    *mgo.Collection
}

// Refresh forgets all cached items.
func (s *Store) Refresh() {
	s.mu.Lock()
	s.entries = make(map[string]entry)
	s.mu.Unlock()
}

// NewStore returns a Store that will cache items for at most the given
// time in the given collection. The session in the collection will not
// be used - the session passed to Store.Session will be used instead.
func NewStore(cacheLifetime time.Duration, c *mgo.Collection) *Store {
	return &Store{
		entries:       make(map[string]entry),
		cacheLifetime: cacheLifetime,
		coll:          c,
	}
}

// Session associates a Store instance with a mongo session.
type Session struct {
	*Store
	coll *mgo.Collection
}

// entryDoc holds the document that's stored in MongoDB.
type entryDoc struct {
	Key string `bson:"_id"`
	// Value holds the value. We store it as a raw value
	// so that it looks nice when looking at the collection directly.
	Value bson.Raw `bson:"value"`
}

// Session returns a store session that uses the given
// session for storage. Each store entry is stored
// in a document in the collection.
func (s *Store) Session(session *mgo.Session) *Session {
	return &Session{
		Store: s,
		coll:  s.coll.With(session),
	}
}

// Put stores the given value for the given key. The value must be a struct type that is
// marshalable as BSON (see http://gopkg.in/mgo.v2/bson).
func (s *Session) Put(key string, val interface{}) error {
	return s.putAtTime(key, val, time.Now())
}

// putAtTime is the internal version of Put - it takes the current time
// as an argument for testing.
func (s *Session) putAtTime(key string, val interface{}, now time.Time) error {
	data, err := bson.Marshal(val)
	if err != nil {
		return errgo.Mask(err)
	}
	s.mu.Lock()
	defer s.mu.Unlock()
	_, err = s.coll.UpsertId(key, bson.D{{
		"$set", bson.D{{"value", bson.Raw{
			Kind: 3,
			Data: data,
		}}},
	}})
	if err != nil {
		return errgo.Notef(err, "cannot put %q", key)
	}
	s.entries[key] = entry{
		expire: now.Add(s.cacheLifetime),
		value:  data,
	}
	return nil
}

// PutInitial puts an initial value for the given key. It does
// nothing if there is already a value stored for the key.
// It reports whether the value was actually set.
func (s *Session) PutInitial(key string, val interface{}) (bool, error) {
	return s.putInitialAtTime(key, val, time.Now())
}

// Update updates the value using the MongoDB update operation
// specified in update. The value is stored in the "value" field
// in the document.
//
// For example, if a value of type struct { N int } is associated
// with a key, then:
//
//	s.Update(key, bson.M{"$inc": bson.M{"value.n": 1}})
//
// will atomically increment the N value.
//
// If there is no value associated with the key, Update
// returns ErrNotFound.
func (s *Session) Update(key string, update interface{}) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	if err := s.coll.UpdateId(key, update); err != nil {
		if err == mgo.ErrNotFound {
			return ErrNotFound
		}
		return errgo.Mask(err)
	}
	// We can't easily find the new value so just delete the
	// item from the cache so it will be fetched next time.
	delete(s.entries, key)
	return nil
}

// putInitialAtTime is the internal version of PutInitial - it takes the current time
// as an argument for testing.
func (s *Session) putInitialAtTime(key string, val interface{}, now time.Time) (bool, error) {
	data, err := bson.Marshal(val)
	if err != nil {
		return false, errgo.Mask(err)
	}
	s.mu.Lock()
	defer s.mu.Unlock()
	err = s.coll.Insert(&entryDoc{
		Key: key,
		Value: bson.Raw{
			Kind: 3,
			Data: data,
		},
	})
	if mgo.IsDup(err) {
		return false, nil
	}
	if err != nil {
		return false, errgo.Mask(err)
	}
	s.entries[key] = entry{
		expire: now.Add(s.cacheLifetime),
		value:  data,
	}
	return true, nil
}

// Get gets the value associated with the given key into the
// value pointed to by v, which should be a pointer to
// the same struct type used to put the value originally.
//
// If the value is not found, it returns ErrNotFound.
func (s *Session) Get(key string, v interface{}) error {
	return s.getAtTime(key, v, time.Now())
}

// getAtTime is the internal version of Get - it takes the current time
// as an argument for testing.
func (s *Session) getAtTime(key string, v interface{}, now time.Time) error {
	e, err := s.getEntryAtTime(key, now)
	if err != nil {
		return errgo.Mask(err)
	}
	if e.value == nil {
		return ErrNotFound
	}
	if err := bson.Unmarshal(e.value, v); err != nil {
		return errgo.Notef(err, "cannot unmarshal data for key %q into %T", key, v)
	}
	return nil
}

func (s *Session) getEntryAtTime(key string, now time.Time) (entry, error) {
	s.mu.RLock()
	e, ok := s.entries[key]
	s.mu.RUnlock()
	if ok && now.Before(e.expire) {
		return e, nil
	}
	s.mu.Lock()
	defer s.mu.Unlock()
	e, ok = s.entries[key]
	if ok && now.Before(e.expire) {
		return e, nil
	}
	var doc entryDoc
	if err := s.coll.FindId(key).One(&doc); err != nil {
		if err != mgo.ErrNotFound {
			return entry{}, errgo.Notef(err, "cannot retrieve data for key %q", key)
		}
	}
	e = entry{
		value:  doc.Value.Data,
		expire: now.Add(s.cacheLifetime),
	}
	s.entries[key] = e
	return e, nil
}