File: daemon_dest.go

package info (click to toggle)
golang-github-containers-image 5.28.0-4
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 5,104 kB
  • sloc: sh: 194; makefile: 73
file content (157 lines) | stat: -rw-r--r-- 6,300 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package daemon

import (
	"context"
	"errors"
	"fmt"
	"io"

	"github.com/containers/image/v5/docker/internal/tarfile"
	"github.com/containers/image/v5/docker/reference"
	"github.com/containers/image/v5/internal/private"
	"github.com/containers/image/v5/types"
	"github.com/docker/docker/client"
	"github.com/sirupsen/logrus"
)

type daemonImageDestination struct {
	ref                  daemonReference
	mustMatchRuntimeOS   bool
	*tarfile.Destination // Implements most of types.ImageDestination
	archive              *tarfile.Writer
	// For talking to imageLoadGoroutine
	goroutineCancel context.CancelFunc
	statusChannel   <-chan error
	writer          *io.PipeWriter
	// Other state
	committed bool // writer has been closed
}

// newImageDestination returns a types.ImageDestination for the specified image reference.
func newImageDestination(ctx context.Context, sys *types.SystemContext, ref daemonReference) (private.ImageDestination, error) {
	if ref.ref == nil {
		return nil, fmt.Errorf("Invalid destination docker-daemon:%s: a destination must be a name:tag", ref.StringWithinTransport())
	}
	namedTaggedRef, ok := ref.ref.(reference.NamedTagged)
	if !ok {
		return nil, fmt.Errorf("Invalid destination docker-daemon:%s: a destination must be a name:tag", ref.StringWithinTransport())
	}

	var mustMatchRuntimeOS = true
	if sys != nil && sys.DockerDaemonHost != client.DefaultDockerHost {
		mustMatchRuntimeOS = false
	}

	c, err := newDockerClient(sys)
	if err != nil {
		return nil, fmt.Errorf("initializing docker engine client: %w", err)
	}

	reader, writer := io.Pipe()
	archive := tarfile.NewWriter(writer)
	// Commit() may never be called, so we may never read from this channel; so, make this buffered to allow imageLoadGoroutine to write status and terminate even if we never read it.
	statusChannel := make(chan error, 1)

	goroutineContext, goroutineCancel := context.WithCancel(ctx)
	go imageLoadGoroutine(goroutineContext, c, reader, statusChannel)

	return &daemonImageDestination{
		ref:                ref,
		mustMatchRuntimeOS: mustMatchRuntimeOS,
		Destination:        tarfile.NewDestination(sys, archive, ref.Transport().Name(), namedTaggedRef),
		archive:            archive,
		goroutineCancel:    goroutineCancel,
		statusChannel:      statusChannel,
		writer:             writer,
		committed:          false,
	}, nil
}

// imageLoadGoroutine accepts tar stream on reader, sends it to c, and reports error or success by writing to statusChannel
func imageLoadGoroutine(ctx context.Context, c *client.Client, reader *io.PipeReader, statusChannel chan<- error) {
	defer c.Close()
	err := errors.New("Internal error: unexpected panic in imageLoadGoroutine")
	defer func() {
		logrus.Debugf("docker-daemon: sending done, status %v", err)
		statusChannel <- err
	}()
	defer func() {
		if err == nil {
			reader.Close()
		} else {
			if err := reader.CloseWithError(err); err != nil {
				logrus.Debugf("imageLoadGoroutine: Error during reader.CloseWithError: %v", err)
			}
		}
	}()

	resp, err := c.ImageLoad(ctx, reader, true)
	if err != nil {
		err = fmt.Errorf("saving image to docker engine: %w", err)
		return
	}
	defer resp.Body.Close()
}

// DesiredLayerCompression indicates if layers must be compressed, decompressed or preserved
func (d *daemonImageDestination) DesiredLayerCompression() types.LayerCompression {
	return types.PreserveOriginal
}

// MustMatchRuntimeOS returns true iff the destination can store only images targeted for the current runtime architecture and OS. False otherwise.
func (d *daemonImageDestination) MustMatchRuntimeOS() bool {
	return d.mustMatchRuntimeOS
}

// Close removes resources associated with an initialized ImageDestination, if any.
func (d *daemonImageDestination) Close() error {
	if !d.committed {
		logrus.Debugf("docker-daemon: Closing tar stream to abort loading")
		// In principle, goroutineCancel() should abort the HTTP request and stop the process from continuing.
		// In practice, though, various HTTP implementations used by client.Client.ImageLoad() (including
		// https://github.com/golang/net/blob/master/context/ctxhttp/ctxhttp_pre17.go and the
		// net/http version with native Context support in Go 1.7) do not always actually immediately cancel
		// the operation: they may process the HTTP request, or a part of it, to completion in a goroutine, and
		// return early if the context is canceled without terminating the goroutine at all.
		// So we need this CloseWithError to terminate sending the HTTP request Body
		// immediately, and hopefully, through terminating the sending which uses "Transfer-Encoding: chunked"" without sending
		// the terminating zero-length chunk, prevent the docker daemon from processing the tar stream at all.
		// Whether that works or not, closing the PipeWriter seems desirable in any case.
		if err := d.writer.CloseWithError(errors.New("Aborting upload, daemonImageDestination closed without a previous .Commit()")); err != nil {
			return err
		}
	}
	d.goroutineCancel()

	return nil
}

func (d *daemonImageDestination) Reference() types.ImageReference {
	return d.ref
}

// Commit marks the process of storing the image as successful and asks for the image to be persisted.
// unparsedToplevel contains data about the top-level manifest of the source (which may be a single-arch image or a manifest list
// if PutManifest was only called for the single-arch image with instanceDigest == nil), primarily to allow lookups by the
// original manifest list digest, if desired.
// WARNING: This does not have any transactional semantics:
// - Uploaded data MAY be visible to others before Commit() is called
// - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed)
func (d *daemonImageDestination) Commit(ctx context.Context, unparsedToplevel types.UnparsedImage) error {
	logrus.Debugf("docker-daemon: Closing tar stream")
	if err := d.archive.Close(); err != nil {
		return err
	}
	if err := d.writer.Close(); err != nil {
		return err
	}
	d.committed = true // We may still fail, but we are done sending to imageLoadGoroutine.

	logrus.Debugf("docker-daemon: Waiting for status")
	select {
	case <-ctx.Done():
		return ctx.Err()
	case err := <-d.statusChannel:
		return err
	}
}