1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
package stream // import "github.com/docker/docker/container/stream"
import (
"context"
"io"
"github.com/containerd/log"
"github.com/docker/docker/pkg/pools"
"github.com/moby/term"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
var defaultEscapeSequence = []byte{16, 17} // ctrl-p, ctrl-q
// AttachConfig is the config struct used to attach a client to a stream's stdio
type AttachConfig struct {
// Tells the attach copier that the stream's stdin is a TTY and to look for
// escape sequences in stdin to detach from the stream.
// When true the escape sequence is not passed to the underlying stream
TTY bool
// Specifies the detach keys the client will be using
// Only useful when `TTY` is true
DetachKeys []byte
// CloseStdin signals that once done, stdin for the attached stream should be closed
// For example, this would close the attached container's stdin.
CloseStdin bool
// UseStd* indicate whether the client has requested to be connected to the
// given stream or not. These flags are used instead of checking Std* != nil
// at points before the client streams Std* are wired up.
UseStdin, UseStdout, UseStderr bool
// CStd* are the streams directly connected to the container
CStdin io.WriteCloser
CStdout, CStderr io.ReadCloser
// Provide client streams to wire up to
Stdin io.ReadCloser
Stdout, Stderr io.Writer
}
// AttachStreams attaches the container's streams to the AttachConfig
func (c *Config) AttachStreams(cfg *AttachConfig) {
if cfg.UseStdin {
cfg.CStdin = c.StdinPipe()
}
if cfg.UseStdout {
cfg.CStdout = c.StdoutPipe()
}
if cfg.UseStderr {
cfg.CStderr = c.StderrPipe()
}
}
// CopyStreams starts goroutines to copy data in and out to/from the container
func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) <-chan error {
var group errgroup.Group
// Connect stdin of container to the attach stdin stream.
if cfg.Stdin != nil {
group.Go(func() error {
log.G(ctx).Debug("attach: stdin: begin")
defer log.G(ctx).Debug("attach: stdin: end")
defer func() {
if cfg.CloseStdin && !cfg.TTY {
cfg.CStdin.Close()
} else {
// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
if cfg.CStdout != nil {
cfg.CStdout.Close()
}
if cfg.CStderr != nil {
cfg.CStderr.Close()
}
}
}()
var err error
if cfg.TTY {
_, err = copyEscapable(cfg.CStdin, cfg.Stdin, cfg.DetachKeys)
} else {
_, err = pools.Copy(cfg.CStdin, cfg.Stdin)
}
if err == io.ErrClosedPipe {
err = nil
}
if err != nil {
log.G(ctx).WithError(err).Debug("error on attach stdin")
return errors.Wrap(err, "error on attach stdin")
}
return nil
})
}
attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) error {
log.G(ctx).Debugf("attach: %s: begin", name)
defer log.G(ctx).Debugf("attach: %s: end", name)
defer func() {
// Make sure stdin gets closed
if cfg.Stdin != nil {
cfg.Stdin.Close()
}
streamPipe.Close()
}()
_, err := pools.Copy(stream, streamPipe)
if err == io.ErrClosedPipe {
err = nil
}
if err != nil {
log.G(ctx).WithError(err).Debugf("attach: %s", name)
return errors.Wrapf(err, "error attaching %s stream", name)
}
return nil
}
if cfg.Stdout != nil {
group.Go(func() error {
return attachStream("stdout", cfg.Stdout, cfg.CStdout)
})
}
if cfg.Stderr != nil {
group.Go(func() error {
return attachStream("stderr", cfg.Stderr, cfg.CStderr)
})
}
errs := make(chan error, 1)
go func() {
defer log.G(ctx).Debug("attach done")
groupErr := make(chan error, 1)
go func() {
groupErr <- group.Wait()
}()
select {
case <-ctx.Done():
// close all pipes
if cfg.CStdin != nil {
cfg.CStdin.Close()
}
if cfg.CStdout != nil {
cfg.CStdout.Close()
}
if cfg.CStderr != nil {
cfg.CStderr.Close()
}
if cfg.Stdin != nil {
// In this case, `cfg.Stdin` is a stream from the client.
// The way `io.Copy` works we may get stuck waiting to read from `cfg.Stdin` even if the container has exited.
// This will cause the `io.Copy` to never return and the `group.Wait()` to never return.
// By closing cfg.Stdin we will cause the `io.Copy` to return and the `group.Wait()` to return.
cfg.Stdin.Close()
}
// Now with these closed, wait should return.
if err := group.Wait(); err != nil {
errs <- err
return
}
errs <- ctx.Err()
case err := <-groupErr:
errs <- err
}
}()
return errs
}
func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64, err error) {
if len(keys) == 0 {
keys = defaultEscapeSequence
}
pr := term.NewEscapeProxy(src, keys)
defer src.Close()
return pools.Copy(dst, pr)
}
|