1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
package exec
import (
"context"
"errors"
"io"
"net"
"github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/stdcopy"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitlab-runner/executors/docker/internal/wait"
"gitlab.com/gitlab-org/gitlab-runner/helpers/docker"
)
// conn is an interface wrapper used to generate mocks that are next used for tests
// nolint:deadcode
type conn interface {
net.Conn
}
// reader is an interface wrapper used to generate mocks that are next used for tests
// nolint:deadcode
type reader interface {
io.Reader
}
type IOStreams struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
type Docker interface {
Exec(ctx context.Context, containerID string, streams IOStreams) error
}
// NewDocker returns a client for starting a new container and running a
// command inside of it.
//
// The context passed is used to wait for any created container to stop. This
// is likely an executor's context. This means that waits to stop are only ever
// canceled should the job be aborted (either manually, or by exceeding the
// build time).
func NewDocker(ctx context.Context, c docker.Client, waiter wait.KillWaiter, logger logrus.FieldLogger) Docker {
return &defaultDocker{
ctx: ctx,
c: c,
waiter: waiter,
logger: logger,
}
}
type defaultDocker struct {
ctx context.Context
c docker.Client
waiter wait.KillWaiter
logger logrus.FieldLogger
}
//nolint:funlen
func (d *defaultDocker) Exec(ctx context.Context, containerID string, streams IOStreams) error {
d.logger.Debugln("Attaching to container", containerID, "...")
hijacked, err := d.c.ContainerAttach(ctx, containerID, attachOptions())
if err != nil {
return err
}
defer hijacked.Close()
d.logger.Debugln("Starting container", containerID, "...")
err = d.c.ContainerStart(ctx, containerID, types.ContainerStartOptions{})
if err != nil {
return err
}
// stdout/stdin error channels, buffered intentionally so that if select{}
// below exits, the go routines don't block forever upon container exit.
stdoutErrCh := make(chan error, 1)
stdinErrCh := make(chan error, 1)
// Copy any output to the build trace
go func() {
_, errCopy := stdcopy.StdCopy(streams.Stdout, streams.Stderr, hijacked.Reader)
// this goroutine can continue even whilst StopKillWait is in flight,
// allowing a graceful stop. If reading stdout returns, we must close
// attached connection, otherwise kills can be interfered with and
// block indefinitely.
hijacked.Close()
stdoutErrCh <- errCopy
}()
// Write the input to the container and close its STDIN to get it to finish
go func() {
_, errCopy := io.Copy(hijacked.Conn, streams.Stdin)
_ = hijacked.CloseWrite()
if errCopy != nil {
stdinErrCh <- errCopy
}
}()
// Wait until either:
// - the job is aborted/cancelled/deadline exceeded
// - stdin has an error
// - stdout returns an error or nil, indicating the stream has ended and
// the container has exited
select {
case <-ctx.Done():
err = errors.New("aborted")
case err = <-stdinErrCh:
case err = <-stdoutErrCh:
}
if err != nil {
d.logger.Debugln("Container", containerID, "finished with", err)
}
// Try to gracefully stop, then kill and wait for the exit.
// Containers are stopped so that they can be reused by the job.
//
// It's very likely that at this point, the context passed to Exec has
// been cancelled, so is unable to be used. Instead, we use the context
// passed to NewDocker.
return d.waiter.StopKillWait(d.ctx, containerID, nil)
}
func attachOptions() types.ContainerAttachOptions {
return types.ContainerAttachOptions{
Stream: true,
Stdin: true,
Stdout: true,
Stderr: true,
}
}
|