1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
|
// Package boltdb implements a BlobInfoCache backed by BoltDB.
package boltdb
import (
"fmt"
"os"
"sync"
"time"
"github.com/containers/image/v5/internal/blobinfocache"
"github.com/containers/image/v5/pkg/blobinfocache/internal/prioritize"
"github.com/containers/image/v5/types"
"github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
)
var (
// NOTE: There is no versioning data inside the file; this is a “cache”, so on an incompatible format upgrade
// we can simply start over with a different filename; update blobInfoCacheFilename.
// FIXME: For CRI-O, does this need to hide information between different users?
// uncompressedDigestBucket stores a mapping from any digest to an uncompressed digest.
uncompressedDigestBucket = []byte("uncompressedDigest")
// digestCompressorBucket stores a mapping from any digest to a compressor, or blobinfocache.Uncompressed (not blobinfocache.UnknownCompression).
// It may not exist in caches created by older versions, even if uncompressedDigestBucket is present.
digestCompressorBucket = []byte("digestCompressor")
// digestByUncompressedBucket stores a bucket per uncompressed digest, with the bucket containing a set of digests for that uncompressed digest
// (as a set of key=digest, value="" pairs)
digestByUncompressedBucket = []byte("digestByUncompressed")
// knownLocationsBucket stores a nested structure of buckets, keyed by (transport name, scope string, blob digest), ultimately containing
// a bucket of (opaque location reference, BinaryMarshaller-encoded time.Time value).
knownLocationsBucket = []byte("knownLocations")
)
// Concurrency:
// See https://www.sqlite.org/src/artifact/c230a7a24?ln=994-1081 for all the issues with locks, which make it extremely
// difficult to use a single BoltDB file from multiple threads/goroutines inside a process. So, we punt and only allow one at a time.
// pathLock contains a lock for a specific BoltDB database path.
type pathLock struct {
refCount int64 // Number of threads/goroutines owning or waiting on this lock. Protected by global pathLocksMutex, NOT by the mutex field below!
mutex sync.Mutex // Owned by the thread/goroutine allowed to access the BoltDB database.
}
var (
// pathLocks contains a lock for each currently open file.
// This must be global so that independently created instances of boltDBCache exclude each other.
// The map is protected by pathLocksMutex.
// FIXME? Should this be based on device:inode numbers instead of paths instead?
pathLocks = map[string]*pathLock{}
pathLocksMutex = sync.Mutex{}
)
// lockPath obtains the pathLock for path.
// The caller must call unlockPath eventually.
func lockPath(path string) {
pl := func() *pathLock { // A scope for defer
pathLocksMutex.Lock()
defer pathLocksMutex.Unlock()
pl, ok := pathLocks[path]
if ok {
pl.refCount++
} else {
pl = &pathLock{refCount: 1, mutex: sync.Mutex{}}
pathLocks[path] = pl
}
return pl
}()
pl.mutex.Lock()
}
// unlockPath releases the pathLock for path.
func unlockPath(path string) {
pathLocksMutex.Lock()
defer pathLocksMutex.Unlock()
pl, ok := pathLocks[path]
if !ok {
// Should this return an error instead? BlobInfoCache ultimately ignores errors…
panic(fmt.Sprintf("Internal error: unlocking nonexistent lock for path %s", path))
}
pl.mutex.Unlock()
pl.refCount--
if pl.refCount == 0 {
delete(pathLocks, path)
}
}
// cache is a BlobInfoCache implementation which uses a BoltDB file at the specified path.
//
// Note that we don’t keep the database open across operations, because that would lock the file and block any other
// users; instead, we need to open/close it for every single write or lookup.
type cache struct {
path string
}
// New returns a BlobInfoCache implementation which uses a BoltDB file at path.
//
// Most users should call blobinfocache.DefaultCache instead.
//
// Deprecated: The BoltDB implementation triggers a panic() on some database format errors; that does not allow
// practical error recovery / fallback.
//
// Use blobinfocache.DefaultCache if at all possible; if not, the pkg/blobinfocache/sqlite implementation.
func New(path string) types.BlobInfoCache {
return new2(path)
}
func new2(path string) *cache {
return &cache{path: path}
}
// Open() sets up the cache for future accesses, potentially acquiring costly state. Each Open() must be paired with a Close().
// Note that public callers may call the types.BlobInfoCache operations without Open()/Close().
func (bdc *cache) Open() {
}
// Close destroys state created by Open().
func (bdc *cache) Close() {
}
// view returns runs the specified fn within a read-only transaction on the database.
func (bdc *cache) view(fn func(tx *bolt.Tx) error) (retErr error) {
// bolt.Open(bdc.path, 0600, &bolt.Options{ReadOnly: true}) will, if the file does not exist,
// nevertheless create it, but with an O_RDONLY file descriptor, try to initialize it, and fail — while holding
// a read lock, blocking any future writes.
// Hence this preliminary check, which is RACY: Another process could remove the file
// between the Lstat call and opening the database.
if _, err := os.Lstat(bdc.path); err != nil && os.IsNotExist(err) {
return err
}
lockPath(bdc.path)
defer unlockPath(bdc.path)
db, err := bolt.Open(bdc.path, 0600, &bolt.Options{ReadOnly: true})
if err != nil {
return err
}
defer func() {
if err := db.Close(); retErr == nil && err != nil {
retErr = err
}
}()
return db.View(fn)
}
// update returns runs the specified fn within a read-write transaction on the database.
func (bdc *cache) update(fn func(tx *bolt.Tx) error) (retErr error) {
lockPath(bdc.path)
defer unlockPath(bdc.path)
db, err := bolt.Open(bdc.path, 0600, nil)
if err != nil {
return err
}
defer func() {
if err := db.Close(); retErr == nil && err != nil {
retErr = err
}
}()
return db.Update(fn)
}
// uncompressedDigest implements BlobInfoCache.UncompressedDigest within the provided read-only transaction.
func (bdc *cache) uncompressedDigest(tx *bolt.Tx, anyDigest digest.Digest) digest.Digest {
if b := tx.Bucket(uncompressedDigestBucket); b != nil {
if uncompressedBytes := b.Get([]byte(anyDigest.String())); uncompressedBytes != nil {
d, err := digest.Parse(string(uncompressedBytes))
if err == nil {
return d
}
// FIXME? Log err (but throttle the log volume on repeated accesses)?
}
}
// Presence in digestsByUncompressedBucket implies that anyDigest must already refer to an uncompressed digest.
// This way we don't have to waste storage space with trivial (uncompressed, uncompressed) mappings
// when we already record a (compressed, uncompressed) pair.
if b := tx.Bucket(digestByUncompressedBucket); b != nil {
if b = b.Bucket([]byte(anyDigest.String())); b != nil {
c := b.Cursor()
if k, _ := c.First(); k != nil { // The bucket is non-empty
return anyDigest
}
}
}
return ""
}
// UncompressedDigest returns an uncompressed digest corresponding to anyDigest.
// May return anyDigest if it is known to be uncompressed.
// Returns "" if nothing is known about the digest (it may be compressed or uncompressed).
func (bdc *cache) UncompressedDigest(anyDigest digest.Digest) digest.Digest {
var res digest.Digest
if err := bdc.view(func(tx *bolt.Tx) error {
res = bdc.uncompressedDigest(tx, anyDigest)
return nil
}); err != nil { // Including os.IsNotExist(err)
return "" // FIXME? Log err (but throttle the log volume on repeated accesses)?
}
return res
}
// RecordDigestUncompressedPair records that the uncompressed version of anyDigest is uncompressed.
// It’s allowed for anyDigest == uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
// because a manifest/config pair exists); otherwise the cache could be poisoned and allow substituting unexpected blobs.
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
func (bdc *cache) RecordDigestUncompressedPair(anyDigest digest.Digest, uncompressed digest.Digest) {
_ = bdc.update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(uncompressedDigestBucket)
if err != nil {
return err
}
key := []byte(anyDigest.String())
if previousBytes := b.Get(key); previousBytes != nil {
previous, err := digest.Parse(string(previousBytes))
if err != nil {
return err
}
if previous != uncompressed {
logrus.Warnf("Uncompressed digest for blob %s previously recorded as %s, now %s", anyDigest, previous, uncompressed)
}
}
if err := b.Put(key, []byte(uncompressed.String())); err != nil {
return err
}
b, err = tx.CreateBucketIfNotExists(digestByUncompressedBucket)
if err != nil {
return err
}
b, err = b.CreateBucketIfNotExists([]byte(uncompressed.String()))
if err != nil {
return err
}
if err := b.Put([]byte(anyDigest.String()), []byte{}); err != nil { // Possibly writing the same []byte{} presence marker again.
return err
}
return nil
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}
// RecordDigestCompressorName records that the blob with digest anyDigest was compressed with the specified
// compressor, or is blobinfocache.Uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
// because a manifest/config pair exists); otherwise the cache could be poisoned and allow substituting unexpected blobs.
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
func (bdc *cache) RecordDigestCompressorName(anyDigest digest.Digest, compressorName string) {
_ = bdc.update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(digestCompressorBucket)
if err != nil {
return err
}
key := []byte(anyDigest.String())
if previousBytes := b.Get(key); previousBytes != nil {
if string(previousBytes) != compressorName {
logrus.Warnf("Compressor for blob with digest %s previously recorded as %s, now %s", anyDigest, string(previousBytes), compressorName)
}
}
if compressorName == blobinfocache.UnknownCompression {
return b.Delete(key)
}
return b.Put(key, []byte(compressorName))
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}
// RecordKnownLocation records that a blob with the specified digest exists within the specified (transport, scope) scope,
// and can be reused given the opaque location data.
func (bdc *cache) RecordKnownLocation(transport types.ImageTransport, scope types.BICTransportScope, blobDigest digest.Digest, location types.BICLocationReference) {
_ = bdc.update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(knownLocationsBucket)
if err != nil {
return err
}
b, err = b.CreateBucketIfNotExists([]byte(transport.Name()))
if err != nil {
return err
}
b, err = b.CreateBucketIfNotExists([]byte(scope.Opaque))
if err != nil {
return err
}
b, err = b.CreateBucketIfNotExists([]byte(blobDigest.String()))
if err != nil {
return err
}
value, err := time.Now().MarshalBinary()
if err != nil {
return err
}
if err := b.Put([]byte(location.Opaque), value); err != nil { // Possibly overwriting an older entry.
return err
}
return nil
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}
// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket with corresponding compression info from compressionBucket (if compressionBucket is not nil), and returns the result of appending them to candidates.
func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket *bolt.Bucket, digest digest.Digest, requireCompressionInfo bool) []prioritize.CandidateWithTime {
digestKey := []byte(digest.String())
b := scopeBucket.Bucket(digestKey)
if b == nil {
return candidates
}
compressorName := blobinfocache.UnknownCompression
if compressionBucket != nil {
// the bucket won't exist if the cache was created by a v1 implementation and
// hasn't yet been updated by a v2 implementation
if compressorNameValue := compressionBucket.Get(digestKey); len(compressorNameValue) > 0 {
compressorName = string(compressorNameValue)
}
}
if compressorName == blobinfocache.UnknownCompression && requireCompressionInfo {
return candidates
}
_ = b.ForEach(func(k, v []byte) error {
t := time.Time{}
if err := t.UnmarshalBinary(v); err != nil {
return err
}
candidates = append(candidates, prioritize.CandidateWithTime{
Candidate: blobinfocache.BICReplacementCandidate2{
Digest: digest,
CompressorName: compressorName,
Location: types.BICLocationReference{Opaque: string(k)},
},
LastSeen: t,
})
return nil
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
return candidates
}
// CandidateLocations2 returns a prioritized, limited, number of blobs and their locations that could possibly be reused
// within the specified (transport scope) (if they still exist, which is not guaranteed).
//
// If !canSubstitute, the returned candidates will match the submitted digest exactly; if canSubstitute,
// data from previous RecordDigestUncompressedPair calls is used to also look up variants of the blob which have the same
// uncompressed digest.
func (bdc *cache) CandidateLocations2(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute bool) []blobinfocache.BICReplacementCandidate2 {
return bdc.candidateLocations(transport, scope, primaryDigest, canSubstitute, true)
}
func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, requireCompressionInfo bool) []blobinfocache.BICReplacementCandidate2 {
res := []prioritize.CandidateWithTime{}
var uncompressedDigestValue digest.Digest // = ""
if err := bdc.view(func(tx *bolt.Tx) error {
scopeBucket := tx.Bucket(knownLocationsBucket)
if scopeBucket == nil {
return nil
}
scopeBucket = scopeBucket.Bucket([]byte(transport.Name()))
if scopeBucket == nil {
return nil
}
scopeBucket = scopeBucket.Bucket([]byte(scope.Opaque))
if scopeBucket == nil {
return nil
}
// compressionBucket won't have been created if previous writers never recorded info about compression,
// and we don't want to fail just because of that
compressionBucket := tx.Bucket(digestCompressorBucket)
res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, primaryDigest, requireCompressionInfo)
if canSubstitute {
if uncompressedDigestValue = bdc.uncompressedDigest(tx, primaryDigest); uncompressedDigestValue != "" {
b := tx.Bucket(digestByUncompressedBucket)
if b != nil {
b = b.Bucket([]byte(uncompressedDigestValue.String()))
if b != nil {
if err := b.ForEach(func(k, _ []byte) error {
d, err := digest.Parse(string(k))
if err != nil {
return err
}
if d != primaryDigest && d != uncompressedDigestValue {
res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, d, requireCompressionInfo)
}
return nil
}); err != nil {
return err
}
}
}
if uncompressedDigestValue != primaryDigest {
res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, uncompressedDigestValue, requireCompressionInfo)
}
}
}
return nil
}); err != nil { // Including os.IsNotExist(err)
return []blobinfocache.BICReplacementCandidate2{} // FIXME? Log err (but throttle the log volume on repeated accesses)?
}
return prioritize.DestructivelyPrioritizeReplacementCandidates(res, primaryDigest, uncompressedDigestValue)
}
// CandidateLocations returns a prioritized, limited, number of blobs and their locations that could possibly be reused
// within the specified (transport scope) (if they still exist, which is not guaranteed).
//
// If !canSubstitute, the returned cadidates will match the submitted digest exactly; if canSubstitute,
// data from previous RecordDigestUncompressedPair calls is used to also look up variants of the blob which have the same
// uncompressed digest.
func (bdc *cache) CandidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute bool) []types.BICReplacementCandidate {
return blobinfocache.CandidateLocationsFromV2(bdc.candidateLocations(transport, scope, primaryDigest, canSubstitute, false))
}
|