File: daemon_dest.go

package info (click to toggle)
golang-github-containers-image 5.36.1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 5,152 kB
  • sloc: sh: 267; makefile: 100
file content (184 lines) | stat: -rw-r--r-- 6,933 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package daemon

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"

	"github.com/containers/image/v5/docker/internal/tarfile"
	"github.com/containers/image/v5/docker/reference"
	"github.com/containers/image/v5/internal/private"
	"github.com/containers/image/v5/types"
	"github.com/docker/docker/client"
	"github.com/sirupsen/logrus"
)

type daemonImageDestination struct {
	ref                  daemonReference
	mustMatchRuntimeOS   bool
	*tarfile.Destination // Implements most of types.ImageDestination
	archive              *tarfile.Writer
	// For talking to imageLoadGoroutine
	goroutineCancel context.CancelFunc
	statusChannel   <-chan error
	writer          *io.PipeWriter
	// Other state
	committed bool // writer has been closed
}

// newImageDestination returns a types.ImageDestination for the specified image reference.
func newImageDestination(ctx context.Context, sys *types.SystemContext, ref daemonReference) (private.ImageDestination, error) {
	if ref.ref == nil {
		return nil, fmt.Errorf("Invalid destination docker-daemon:%s: a destination must be a name:tag", ref.StringWithinTransport())
	}
	namedTaggedRef, ok := ref.ref.(reference.NamedTagged)
	if !ok {
		return nil, fmt.Errorf("Invalid destination docker-daemon:%s: a destination must be a name:tag", ref.StringWithinTransport())
	}

	var mustMatchRuntimeOS = true
	if sys != nil && sys.DockerDaemonHost != client.DefaultDockerHost {
		mustMatchRuntimeOS = false
	}

	c, err := newDockerClient(sys)
	if err != nil {
		return nil, fmt.Errorf("initializing docker engine client: %w", err)
	}

	reader, writer := io.Pipe()
	archive := tarfile.NewWriter(writer)
	// Commit() may never be called, so we may never read from this channel; so, make this buffered to allow imageLoadGoroutine to write status and terminate even if we never read it.
	statusChannel := make(chan error, 1)

	goroutineContext, goroutineCancel := context.WithCancel(ctx)
	go imageLoadGoroutine(goroutineContext, c, reader, statusChannel)

	d := &daemonImageDestination{
		ref:                ref,
		mustMatchRuntimeOS: mustMatchRuntimeOS,
		archive:            archive,
		goroutineCancel:    goroutineCancel,
		statusChannel:      statusChannel,
		writer:             writer,
		committed:          false,
	}
	d.Destination = tarfile.NewDestination(sys, archive, ref.Transport().Name(), namedTaggedRef, d.CommitWithOptions)
	return d, nil
}

// imageLoadGoroutine accepts tar stream on reader, sends it to c, and reports error or success by writing to statusChannel
func imageLoadGoroutine(ctx context.Context, c *client.Client, reader *io.PipeReader, statusChannel chan<- error) {
	defer c.Close()
	err := errors.New("Internal error: unexpected panic in imageLoadGoroutine")
	defer func() {
		logrus.Debugf("docker-daemon: sending done, status %v", err)
		statusChannel <- err
	}()
	defer func() {
		if err == nil {
			reader.Close()
		} else {
			if err := reader.CloseWithError(err); err != nil {
				logrus.Debugf("imageLoadGoroutine: Error during reader.CloseWithError: %v", err)
			}
		}
	}()

	err = imageLoad(ctx, c, reader)
}

// imageLoad accepts tar stream on reader and sends it to c
func imageLoad(ctx context.Context, c *client.Client, reader *io.PipeReader) error {
	resp, err := c.ImageLoad(ctx, reader, true)
	if err != nil {
		return fmt.Errorf("starting a load operation in docker engine: %w", err)
	}
	defer resp.Body.Close()

	// jsonError and jsonMessage are small subsets of docker/docker/pkg/jsonmessage.JSONError and JSONMessage,
	// copied here to minimize dependencies.
	type jsonError struct {
		Message string `json:"message,omitempty"`
	}
	type jsonMessage struct {
		Error *jsonError `json:"errorDetail,omitempty"`
	}

	dec := json.NewDecoder(resp.Body)
	for {
		var msg jsonMessage
		if err := dec.Decode(&msg); err != nil {
			if err == io.EOF {
				break
			}
			return fmt.Errorf("parsing docker load progress: %w", err)
		}
		if msg.Error != nil {
			return fmt.Errorf("docker engine reported: %q", msg.Error.Message)
		}
	}
	return nil // No error reported = success
}

// DesiredLayerCompression indicates if layers must be compressed, decompressed or preserved
func (d *daemonImageDestination) DesiredLayerCompression() types.LayerCompression {
	return types.PreserveOriginal
}

// MustMatchRuntimeOS returns true iff the destination can store only images targeted for the current runtime architecture and OS. False otherwise.
func (d *daemonImageDestination) MustMatchRuntimeOS() bool {
	return d.mustMatchRuntimeOS
}

// Close removes resources associated with an initialized ImageDestination, if any.
func (d *daemonImageDestination) Close() error {
	if !d.committed {
		logrus.Debugf("docker-daemon: Closing tar stream to abort loading")
		// In principle, goroutineCancel() should abort the HTTP request and stop the process from continuing.
		// In practice, though, various HTTP implementations used by client.Client.ImageLoad() (including
		// https://github.com/golang/net/blob/master/context/ctxhttp/ctxhttp_pre17.go and the
		// net/http version with native Context support in Go 1.7) do not always actually immediately cancel
		// the operation: they may process the HTTP request, or a part of it, to completion in a goroutine, and
		// return early if the context is canceled without terminating the goroutine at all.
		// So we need this CloseWithError to terminate sending the HTTP request Body
		// immediately, and hopefully, through terminating the sending which uses "Transfer-Encoding: chunked"" without sending
		// the terminating zero-length chunk, prevent the docker daemon from processing the tar stream at all.
		// Whether that works or not, closing the PipeWriter seems desirable in any case.
		if err := d.writer.CloseWithError(errors.New("Aborting upload, daemonImageDestination closed without a previous .CommitWithOptions()")); err != nil {
			return err
		}
	}
	d.goroutineCancel()

	return nil
}

func (d *daemonImageDestination) Reference() types.ImageReference {
	return d.ref
}

// CommitWithOptions marks the process of storing the image as successful and asks for the image to be persisted.
// WARNING: This does not have any transactional semantics:
// - Uploaded data MAY be visible to others before CommitWithOptions() is called
// - Uploaded data MAY be removed or MAY remain around if Close() is called without CommitWithOptions() (i.e. rollback is allowed but not guaranteed)
func (d *daemonImageDestination) CommitWithOptions(ctx context.Context, options private.CommitOptions) error {
	logrus.Debugf("docker-daemon: Closing tar stream")
	if err := d.archive.Close(); err != nil {
		return err
	}
	if err := d.writer.Close(); err != nil {
		return err
	}
	d.committed = true // We may still fail, but we are done sending to imageLoadGoroutine.

	logrus.Debugf("docker-daemon: Waiting for status")
	select {
	case <-ctx.Done():
		return ctx.Err()
	case err := <-d.statusChannel:
		return err
	}
}