File: progress.go

package info (click to toggle)
singularity-container 4.1.5%2Bds4-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 43,876 kB
  • sloc: asm: 14,840; sh: 3,190; ansic: 1,751; awk: 414; makefile: 413; python: 99
file content (145 lines) | stat: -rw-r--r-- 3,655 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package pullprogress

import (
	"context"
	"io"
	"time"

	"github.com/containerd/containerd/content"
	"github.com/containerd/containerd/remotes"
	cerrdefs "github.com/containerd/errdefs"
	"github.com/moby/buildkit/util/bklog"
	"github.com/moby/buildkit/util/progress"
	ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
)

type PullManager interface {
	content.IngestManager
	content.Manager
}

type ProviderWithProgress struct {
	Provider content.Provider
	Manager  PullManager
}

func (p *ProviderWithProgress) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
	ra, err := p.Provider.ReaderAt(ctx, desc)
	if err != nil {
		return nil, err
	}

	ctx, cancel := context.WithCancelCause(ctx)
	doneCh := make(chan struct{})
	go trackProgress(ctx, desc, p.Manager, doneCh)
	return readerAtWithCancel{ReaderAt: ra, cancel: cancel, doneCh: doneCh, logger: bklog.G(ctx)}, nil
}

type readerAtWithCancel struct {
	content.ReaderAt
	cancel func(error)
	doneCh <-chan struct{}
	logger *logrus.Entry
}

func (ra readerAtWithCancel) Close() error {
	ra.cancel(errors.WithStack(context.Canceled))
	select {
	case <-ra.doneCh:
	case <-time.After(time.Second):
		ra.logger.Warn("timeout waiting for pull progress to complete")
	}
	return ra.ReaderAt.Close()
}

type FetcherWithProgress struct {
	Fetcher remotes.Fetcher
	Manager PullManager
}

func (f *FetcherWithProgress) Fetch(ctx context.Context, desc ocispecs.Descriptor) (io.ReadCloser, error) {
	rc, err := f.Fetcher.Fetch(ctx, desc)
	if err != nil {
		return nil, err
	}

	ctx, cancel := context.WithCancelCause(ctx)
	doneCh := make(chan struct{})
	go trackProgress(ctx, desc, f.Manager, doneCh)
	return readerWithCancel{ReadCloser: rc, cancel: cancel, doneCh: doneCh, logger: bklog.G(ctx)}, nil
}

type readerWithCancel struct {
	io.ReadCloser
	cancel func(error)
	doneCh <-chan struct{}
	logger *logrus.Entry
}

func (r readerWithCancel) Close() error {
	r.cancel(errors.WithStack(context.Canceled))
	select {
	case <-r.doneCh:
	case <-time.After(time.Second):
		r.logger.Warn("timeout waiting for pull progress to complete")
	}
	return r.ReadCloser.Close()
}

func trackProgress(ctx context.Context, desc ocispecs.Descriptor, manager PullManager, doneCh chan<- struct{}) {
	defer close(doneCh)

	ticker := time.NewTicker(150 * time.Millisecond)
	defer ticker.Stop()
	go func(ctx context.Context) {
		<-ctx.Done()
		ticker.Stop()
	}(ctx)

	pw, _, _ := progress.NewFromContext(ctx)
	defer pw.Close()

	ingestRef := remotes.MakeRefKey(ctx, desc)

	started := time.Now()
	onFinalStatus := false
	for !onFinalStatus {
		select {
		case <-ctx.Done():
			onFinalStatus = true
			// we need a context for the manager.Status() calls to pass once. after that this function will exit
			ctx = context.TODO()
		case <-ticker.C:
		}

		status, err := manager.Status(ctx, ingestRef)
		if err == nil {
			pw.Write(desc.Digest.String(), progress.Status{
				Current: int(status.Offset),
				Total:   int(status.Total),
				Started: &started,
			})
			continue
		} else if !errors.Is(err, cerrdefs.ErrNotFound) {
			bklog.G(ctx).Errorf("unexpected error getting ingest status of %q: %v", ingestRef, err)
			return
		}

		info, err := manager.Info(ctx, desc.Digest)
		if err == nil {
			// info.CreatedAt could be before started if parallel pull just completed
			if info.CreatedAt.Before(started) {
				started = info.CreatedAt
			}
			pw.Write(desc.Digest.String(), progress.Status{
				Current:   int(info.Size),
				Total:     int(info.Size),
				Started:   &started,
				Completed: &info.CreatedAt,
			})
			return
		}
	}
}