1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
package pullprogress
import (
"context"
"io"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/remotes"
cerrdefs "github.com/containerd/errdefs"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/progress"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type PullManager interface {
content.IngestManager
content.Manager
}
type ProviderWithProgress struct {
Provider content.Provider
Manager PullManager
}
func (p *ProviderWithProgress) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
ra, err := p.Provider.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancelCause(ctx)
doneCh := make(chan struct{})
go trackProgress(ctx, desc, p.Manager, doneCh)
return readerAtWithCancel{ReaderAt: ra, cancel: cancel, doneCh: doneCh, logger: bklog.G(ctx)}, nil
}
type readerAtWithCancel struct {
content.ReaderAt
cancel func(error)
doneCh <-chan struct{}
logger *logrus.Entry
}
func (ra readerAtWithCancel) Close() error {
ra.cancel(errors.WithStack(context.Canceled))
select {
case <-ra.doneCh:
case <-time.After(time.Second):
ra.logger.Warn("timeout waiting for pull progress to complete")
}
return ra.ReaderAt.Close()
}
type FetcherWithProgress struct {
Fetcher remotes.Fetcher
Manager PullManager
}
func (f *FetcherWithProgress) Fetch(ctx context.Context, desc ocispecs.Descriptor) (io.ReadCloser, error) {
rc, err := f.Fetcher.Fetch(ctx, desc)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancelCause(ctx)
doneCh := make(chan struct{})
go trackProgress(ctx, desc, f.Manager, doneCh)
return readerWithCancel{ReadCloser: rc, cancel: cancel, doneCh: doneCh, logger: bklog.G(ctx)}, nil
}
type readerWithCancel struct {
io.ReadCloser
cancel func(error)
doneCh <-chan struct{}
logger *logrus.Entry
}
func (r readerWithCancel) Close() error {
r.cancel(errors.WithStack(context.Canceled))
select {
case <-r.doneCh:
case <-time.After(time.Second):
r.logger.Warn("timeout waiting for pull progress to complete")
}
return r.ReadCloser.Close()
}
func trackProgress(ctx context.Context, desc ocispecs.Descriptor, manager PullManager, doneCh chan<- struct{}) {
defer close(doneCh)
ticker := time.NewTicker(150 * time.Millisecond)
defer ticker.Stop()
go func(ctx context.Context) {
<-ctx.Done()
ticker.Stop()
}(ctx)
pw, _, _ := progress.NewFromContext(ctx)
defer pw.Close()
ingestRef := remotes.MakeRefKey(ctx, desc)
started := time.Now()
onFinalStatus := false
for !onFinalStatus {
select {
case <-ctx.Done():
onFinalStatus = true
// we need a context for the manager.Status() calls to pass once. after that this function will exit
ctx = context.TODO()
case <-ticker.C:
}
status, err := manager.Status(ctx, ingestRef)
if err == nil {
pw.Write(desc.Digest.String(), progress.Status{
Current: int(status.Offset),
Total: int(status.Total),
Started: &started,
})
continue
} else if !errors.Is(err, cerrdefs.ErrNotFound) {
bklog.G(ctx).Errorf("unexpected error getting ingest status of %q: %v", ingestRef, err)
return
}
info, err := manager.Info(ctx, desc.Digest)
if err == nil {
// info.CreatedAt could be before started if parallel pull just completed
if info.CreatedAt.Before(started) {
started = info.CreatedAt
}
pw.Write(desc.Digest.String(), progress.Status{
Current: int(info.Size),
Total: int(info.Size),
Started: &started,
Completed: &info.CreatedAt,
})
return
}
}
}
|