1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
package xfer // import "github.com/docker/docker/distribution/xfer"
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"github.com/docker/distribution"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progress"
)
const maxUploadConcurrency = 3
type mockUploadDescriptor struct {
currentUploads *int32
diffID layer.DiffID
simulateRetries int
}
// Key returns the key used to deduplicate downloads.
func (u *mockUploadDescriptor) Key() string {
return u.diffID.String()
}
// ID returns the ID for display purposes.
func (u *mockUploadDescriptor) ID() string {
return u.diffID.String()
}
// DiffID should return the DiffID for this layer.
func (u *mockUploadDescriptor) DiffID() layer.DiffID {
return u.diffID
}
// SetRemoteDescriptor is not used in the mock.
func (u *mockUploadDescriptor) SetRemoteDescriptor(remoteDescriptor distribution.Descriptor) {
}
// Upload is called to perform the upload.
func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
if u.currentUploads != nil {
defer atomic.AddInt32(u.currentUploads, -1)
if atomic.AddInt32(u.currentUploads, 1) > maxUploadConcurrency {
return distribution.Descriptor{}, errors.New("concurrency limit exceeded")
}
}
// Sleep a bit to simulate a time-consuming upload.
for i := int64(0); i <= 10; i++ {
select {
case <-ctx.Done():
return distribution.Descriptor{}, ctx.Err()
case <-time.After(10 * time.Millisecond):
progressOutput.WriteProgress(progress.Progress{ID: u.ID(), Current: i, Total: 10})
}
}
if u.simulateRetries != 0 {
u.simulateRetries--
return distribution.Descriptor{}, errors.New("simulating retry")
}
return distribution.Descriptor{}, nil
}
func uploadDescriptors(currentUploads *int32) []UploadDescriptor {
return []UploadDescriptor{
&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"},
&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:1515325234325236634634608943609283523908626098235490238423902343"},
&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:6929356290463485374960346430698374523437683470934634534953453453"},
&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"},
&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:8159352387436803946235346346368745389534789534897538734598734987", simulateRetries: 1},
&mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:4637863963478346897346987346987346789346789364879364897364987346"},
}
}
func TestSuccessfulUpload(t *testing.T) {
lum := NewLayerUploadManager(maxUploadConcurrency, func(m *LayerUploadManager) { m.waitDuration = time.Millisecond })
progressChan := make(chan progress.Progress)
progressDone := make(chan struct{})
receivedProgress := make(map[string]int64)
go func() {
for p := range progressChan {
receivedProgress[p.ID] = p.Current
}
close(progressDone)
}()
var currentUploads int32
descriptors := uploadDescriptors(¤tUploads)
err := lum.Upload(context.Background(), descriptors, progress.ChanOutput(progressChan))
if err != nil {
t.Fatalf("upload error: %v", err)
}
close(progressChan)
<-progressDone
}
func TestCancelledUpload(t *testing.T) {
lum := NewLayerUploadManager(maxUploadConcurrency, func(m *LayerUploadManager) { m.waitDuration = time.Millisecond })
progressChan := make(chan progress.Progress)
progressDone := make(chan struct{})
go func() {
for range progressChan {
}
close(progressDone)
}()
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-time.After(time.Millisecond)
cancel()
}()
descriptors := uploadDescriptors(nil)
err := lum.Upload(ctx, descriptors, progress.ChanOutput(progressChan))
if err != context.Canceled {
t.Fatal("expected upload to be cancelled")
}
close(progressChan)
<-progressDone
}
|