1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
package xfer // import "github.com/docker/docker/distribution/xfer"
import (
"context"
"errors"
"time"
"github.com/containerd/log"
"github.com/docker/distribution"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progress"
)
const maxUploadAttempts = 5
// LayerUploadManager provides task management and progress reporting for
// uploads.
type LayerUploadManager struct {
tm *transferManager
waitDuration time.Duration
}
// SetConcurrency sets the max concurrent uploads for each push
func (lum *LayerUploadManager) SetConcurrency(concurrency int) {
lum.tm.setConcurrency(concurrency)
}
// NewLayerUploadManager returns a new LayerUploadManager.
func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadManager)) *LayerUploadManager {
manager := LayerUploadManager{
tm: newTransferManager(concurrencyLimit),
waitDuration: time.Second,
}
for _, option := range options {
option(&manager)
}
return &manager
}
type uploadTransfer struct {
transfer
remoteDescriptor distribution.Descriptor
err error
}
// An UploadDescriptor references a layer that may need to be uploaded.
type UploadDescriptor interface {
// Key returns the key used to deduplicate uploads.
Key() string
// ID returns the ID for display purposes.
ID() string
// DiffID should return the DiffID for this layer.
DiffID() layer.DiffID
// Upload is called to perform the Upload.
Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error)
// SetRemoteDescriptor provides the distribution.Descriptor that was
// returned by Upload. This descriptor is not to be confused with
// the UploadDescriptor interface, which is used for internally
// identifying layers that are being uploaded.
SetRemoteDescriptor(descriptor distribution.Descriptor)
}
// Upload is a blocking function which ensures the listed layers are present on
// the remote registry. It uses the string returned by the Key method to
// deduplicate uploads.
func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescriptor, progressOutput progress.Output) error {
var (
uploads []*uploadTransfer
dedupDescriptors = make(map[string]*uploadTransfer)
)
for _, descriptor := range layers {
progress.Update(progressOutput, descriptor.ID(), "Preparing")
key := descriptor.Key()
if _, present := dedupDescriptors[key]; present {
continue
}
xferFunc := lum.makeUploadFunc(descriptor)
upload, watcher := lum.tm.transfer(descriptor.Key(), xferFunc, progressOutput)
defer upload.release(watcher)
uploads = append(uploads, upload.(*uploadTransfer))
dedupDescriptors[key] = upload.(*uploadTransfer)
}
for _, upload := range uploads {
select {
case <-ctx.Done():
return ctx.Err()
case <-upload.transfer.done():
if upload.err != nil {
return upload.err
}
}
}
for _, l := range layers {
l.SetRemoteDescriptor(dedupDescriptors[l.Key()].remoteDescriptor)
}
return nil
}
func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) doFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer {
u := &uploadTransfer{
transfer: newTransfer(),
}
go func() {
defer func() {
close(progressChan)
}()
progressOutput := progress.ChanOutput(progressChan)
select {
case <-start:
default:
progress.Update(progressOutput, descriptor.ID(), "Waiting")
<-start
}
retries := 0
for {
remoteDescriptor, err := descriptor.Upload(u.transfer.context(), progressOutput)
if err == nil {
u.remoteDescriptor = remoteDescriptor
break
}
// If an error was returned because the context
// was cancelled, we shouldn't retry.
select {
case <-u.transfer.context().Done():
u.err = err
return
default:
}
retries++
if _, isDNR := err.(DoNotRetry); isDNR || retries == maxUploadAttempts {
log.G(context.TODO()).Errorf("Upload failed: %v", err)
u.err = err
return
}
log.G(context.TODO()).Errorf("Upload failed, retrying: %v", err)
delay := retries * 5
ticker := time.NewTicker(lum.waitDuration)
selectLoop:
for {
progress.Updatef(progressOutput, descriptor.ID(), "Retrying in %d second%s", delay, (map[bool]string{true: "s"})[delay != 1])
select {
case <-ticker.C:
delay--
if delay == 0 {
ticker.Stop()
break selectLoop
}
case <-u.transfer.context().Done():
ticker.Stop()
u.err = errors.New("upload cancelled during retry delay")
return
}
}
}
}()
return u
}
}
|