File: src.go

package info (click to toggle)
golang-github-containers-image 5.28.0-4
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 5,104 kB
  • sloc: sh: 194; makefile: 73
file content (270 lines) | stat: -rw-r--r-- 9,496 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
package blobcache

import (
	"context"
	"fmt"
	"io"
	"os"
	"sync"

	"github.com/containers/image/v5/internal/image"
	"github.com/containers/image/v5/internal/imagesource"
	"github.com/containers/image/v5/internal/imagesource/impl"
	"github.com/containers/image/v5/internal/manifest"
	"github.com/containers/image/v5/internal/private"
	"github.com/containers/image/v5/internal/signature"
	"github.com/containers/image/v5/pkg/compression"
	"github.com/containers/image/v5/transports"
	"github.com/containers/image/v5/types"
	digest "github.com/opencontainers/go-digest"
	v1 "github.com/opencontainers/image-spec/specs-go/v1"
	"github.com/sirupsen/logrus"
)

type blobCacheSource struct {
	impl.Compat

	reference *BlobCache
	source    private.ImageSource
	sys       types.SystemContext
	// this mutex synchronizes the counters below
	mu          sync.Mutex
	cacheHits   int64
	cacheMisses int64
	cacheErrors int64
}

func (b *BlobCache) NewImageSource(ctx context.Context, sys *types.SystemContext) (types.ImageSource, error) {
	src, err := b.reference.NewImageSource(ctx, sys)
	if err != nil {
		return nil, fmt.Errorf("error creating new image source %q: %w", transports.ImageName(b.reference), err)
	}
	logrus.Debugf("starting to read from image %q using blob cache in %q (compression=%v)", transports.ImageName(b.reference), b.directory, b.compress)
	s := &blobCacheSource{reference: b, source: imagesource.FromPublic(src), sys: *sys}
	s.Compat = impl.AddCompat(s)
	return s, nil
}

func (s *blobCacheSource) Reference() types.ImageReference {
	return s.reference
}

func (s *blobCacheSource) Close() error {
	logrus.Debugf("finished reading from image %q using blob cache: cache had %d hits, %d misses, %d errors", transports.ImageName(s.reference), s.cacheHits, s.cacheMisses, s.cacheErrors)
	return s.source.Close()
}

func (s *blobCacheSource) GetManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) {
	if instanceDigest != nil {
		filename := s.reference.blobPath(*instanceDigest, false)
		manifestBytes, err := os.ReadFile(filename)
		if err == nil {
			s.cacheHits++
			return manifestBytes, manifest.GuessMIMEType(manifestBytes), nil
		}
		if !os.IsNotExist(err) {
			s.cacheErrors++
			return nil, "", fmt.Errorf("checking for manifest file: %w", err)
		}
	}
	s.cacheMisses++
	return s.source.GetManifest(ctx, instanceDigest)
}

func (s *blobCacheSource) HasThreadSafeGetBlob() bool {
	return s.source.HasThreadSafeGetBlob()
}

func (s *blobCacheSource) GetBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache) (io.ReadCloser, int64, error) {
	blobPath, size, _, err := s.reference.findBlob(blobinfo)
	if err != nil {
		return nil, -1, err
	}
	if blobPath != "" {
		f, err := os.Open(blobPath)
		if err == nil {
			s.mu.Lock()
			s.cacheHits++
			s.mu.Unlock()
			return f, size, nil
		}
		if !os.IsNotExist(err) {
			s.mu.Lock()
			s.cacheErrors++
			s.mu.Unlock()
			return nil, -1, fmt.Errorf("checking for cache: %w", err)
		}
	}
	s.mu.Lock()
	s.cacheMisses++
	s.mu.Unlock()
	rc, size, err := s.source.GetBlob(ctx, blobinfo, cache)
	if err != nil {
		return rc, size, fmt.Errorf("error reading blob from source image %q: %w", transports.ImageName(s.reference), err)
	}
	return rc, size, nil
}

// GetSignaturesWithFormat returns the image's signatures.  It may use a remote (= slow) service.
// If instanceDigest is not nil, it contains a digest of the specific manifest instance to retrieve signatures for
// (when the primary manifest is a manifest list); this never happens if the primary manifest is not a manifest list
// (e.g. if the source never returns manifest lists).
func (s *blobCacheSource) GetSignaturesWithFormat(ctx context.Context, instanceDigest *digest.Digest) ([]signature.Signature, error) {
	return s.source.GetSignaturesWithFormat(ctx, instanceDigest)
}

func (s *blobCacheSource) LayerInfosForCopy(ctx context.Context, instanceDigest *digest.Digest) ([]types.BlobInfo, error) {
	signatures, err := s.source.GetSignaturesWithFormat(ctx, instanceDigest)
	if err != nil {
		return nil, fmt.Errorf("error checking if image %q has signatures: %w", transports.ImageName(s.reference), err)
	}
	canReplaceBlobs := len(signatures) == 0

	infos, err := s.source.LayerInfosForCopy(ctx, instanceDigest)
	if err != nil {
		return nil, fmt.Errorf("error getting layer infos for copying image %q through cache: %w", transports.ImageName(s.reference), err)
	}
	if infos == nil {
		img, err := image.FromUnparsedImage(ctx, &s.sys, image.UnparsedInstance(s.source, instanceDigest))
		if err != nil {
			return nil, fmt.Errorf("error opening image to get layer infos for copying image %q through cache: %w", transports.ImageName(s.reference), err)
		}
		infos = img.LayerInfos()
	}

	if canReplaceBlobs && s.reference.compress != types.PreserveOriginal {
		replacedInfos := make([]types.BlobInfo, 0, len(infos))
		for _, info := range infos {
			var replaceDigest []byte
			var err error
			blobFile := s.reference.blobPath(info.Digest, false)
			var alternate string
			switch s.reference.compress {
			case types.Compress:
				alternate = blobFile + compressedNote
				replaceDigest, err = os.ReadFile(alternate)
			case types.Decompress:
				alternate = blobFile + decompressedNote
				replaceDigest, err = os.ReadFile(alternate)
			}
			if err == nil && digest.Digest(replaceDigest).Validate() == nil {
				alternate = s.reference.blobPath(digest.Digest(replaceDigest), false)
				fileInfo, err := os.Stat(alternate)
				if err == nil {
					switch info.MediaType {
					case v1.MediaTypeImageLayer, v1.MediaTypeImageLayerGzip:
						switch s.reference.compress {
						case types.Compress:
							info.MediaType = v1.MediaTypeImageLayerGzip
							info.CompressionAlgorithm = &compression.Gzip
						case types.Decompress: // FIXME: This should remove zstd:chunked annotations (but those annotations being left with incorrect values should not break pulls)
							info.MediaType = v1.MediaTypeImageLayer
							info.CompressionAlgorithm = nil
						}
					case manifest.DockerV2SchemaLayerMediaTypeUncompressed, manifest.DockerV2Schema2LayerMediaType:
						switch s.reference.compress {
						case types.Compress:
							info.MediaType = manifest.DockerV2Schema2LayerMediaType
							info.CompressionAlgorithm = &compression.Gzip
						case types.Decompress:
							// nope, not going to suggest anything, it's not allowed by the spec
							replacedInfos = append(replacedInfos, info)
							continue
						}
					}
					logrus.Debugf("suggesting cached blob with digest %q, type %q, and compression %v in place of blob with digest %q", string(replaceDigest), info.MediaType, s.reference.compress, info.Digest.String())
					info.CompressionOperation = s.reference.compress
					info.Digest = digest.Digest(replaceDigest)
					info.Size = fileInfo.Size()
					logrus.Debugf("info = %#v", info)
				}
			}
			replacedInfos = append(replacedInfos, info)
		}
		infos = replacedInfos
	}

	return infos, nil
}

// SupportsGetBlobAt() returns true if GetBlobAt (BlobChunkAccessor) is supported.
func (s *blobCacheSource) SupportsGetBlobAt() bool {
	return s.source.SupportsGetBlobAt()
}

// streamChunksFromFile generates the channels returned by GetBlobAt for chunks of seekable file
func streamChunksFromFile(streams chan io.ReadCloser, errs chan error, file io.ReadSeekCloser,
	chunks []private.ImageSourceChunk) {
	defer close(streams)
	defer close(errs)
	defer file.Close()

	for _, c := range chunks {
		// Always seek to the desired offset; that way we don’t need to care about the consumer
		// not reading all of the chunk, or about the position going backwards.
		if _, err := file.Seek(int64(c.Offset), io.SeekStart); err != nil {
			errs <- err
			break
		}
		s := signalCloseReader{
			closed: make(chan struct{}),
			stream: io.LimitReader(file, int64(c.Length)),
		}
		streams <- s

		// Wait until the stream is closed before going to the next chunk
		<-s.closed
	}
}

type signalCloseReader struct {
	closed chan struct{}
	stream io.Reader
}

func (s signalCloseReader) Read(p []byte) (int, error) {
	return s.stream.Read(p)
}

func (s signalCloseReader) Close() error {
	close(s.closed)
	return nil
}

// GetBlobAt returns a sequential channel of readers that contain data for the requested
// blob chunks, and a channel that might get a single error value.
// The specified chunks must be not overlapping and sorted by their offset.
// The readers must be fully consumed, in the order they are returned, before blocking
// to read the next chunk.
func (s *blobCacheSource) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
	blobPath, _, _, err := s.reference.findBlob(info)
	if err != nil {
		return nil, nil, err
	}
	if blobPath != "" {
		f, err := os.Open(blobPath)
		if err == nil {
			s.mu.Lock()
			s.cacheHits++
			s.mu.Unlock()
			streams := make(chan io.ReadCloser)
			errs := make(chan error)
			go streamChunksFromFile(streams, errs, f, chunks)
			return streams, errs, nil
		}
		if !os.IsNotExist(err) {
			s.mu.Lock()
			s.cacheErrors++
			s.mu.Unlock()
			return nil, nil, fmt.Errorf("checking for cache: %w", err)
		}
	}
	s.mu.Lock()
	s.cacheMisses++
	s.mu.Unlock()
	streams, errs, err := s.source.GetBlobAt(ctx, info, chunks)
	if err != nil {
		return streams, errs, fmt.Errorf("error reading blob chunks from source image %q: %w", transports.ImageName(s.reference), err)
	}
	return streams, errs, nil
}