1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
|
package ioset
import (
"io"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// Pipe returns a pair of piped readers and writers collection.
// They are useful for controlling stdio stream using Forwarder function.
func Pipe() (In, Out) {
r1, w1 := io.Pipe()
r2, w2 := io.Pipe()
r3, w3 := io.Pipe()
return In{r1, w2, w3}, Out{w1, r2, r3}
}
type In struct {
Stdin io.ReadCloser
Stdout io.WriteCloser
Stderr io.WriteCloser
}
func (s In) Close() (retErr error) {
if err := s.Stdin.Close(); err != nil {
retErr = err
}
if err := s.Stdout.Close(); err != nil {
retErr = err
}
if err := s.Stderr.Close(); err != nil {
retErr = err
}
return
}
type Out struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
}
func (s Out) Close() (retErr error) {
if err := s.Stdin.Close(); err != nil {
retErr = err
}
if err := s.Stdout.Close(); err != nil {
retErr = err
}
if err := s.Stderr.Close(); err != nil {
retErr = err
}
return
}
// Forwarder forwards IO between readers and writers contained
// in In and Out structs.
// In and Out can be changed during forwarding using SetIn and SetOut methods.
type Forwarder struct {
stdin *SingleForwarder
stdout *SingleForwarder
stderr *SingleForwarder
mu sync.Mutex
// PropagateStdinClose indicates whether EOF from Stdin of Out should be propagated.
// If this is true, EOF from Stdin (reader) of Out closes Stdin (writer) of In.
PropagateStdinClose bool
}
func NewForwarder() *Forwarder {
return &Forwarder{
stdin: NewSingleForwarder(),
stdout: NewSingleForwarder(),
stderr: NewSingleForwarder(),
PropagateStdinClose: true,
}
}
func (f *Forwarder) Close() (retErr error) {
if err := f.stdin.Close(); err != nil {
retErr = err
}
if err := f.stdout.Close(); err != nil {
retErr = err
}
if err := f.stderr.Close(); err != nil {
retErr = err
}
return retErr
}
func (f *Forwarder) SetOut(out *Out) {
f.mu.Lock()
if out == nil {
f.stdin.SetWriter(nil, func() io.WriteCloser { return nil })
f.stdout.SetReader(nil)
f.stderr.SetReader(nil)
} else {
f.stdin.SetWriter(out.Stdin, func() io.WriteCloser {
if f.PropagateStdinClose {
out.Stdin.Close() // propagate EOF
logrus.Debug("forwarder: propagating stdin close")
return nil
}
return out.Stdin
})
f.stdout.SetReader(out.Stdout)
f.stderr.SetReader(out.Stderr)
}
f.mu.Unlock()
}
func (f *Forwarder) SetIn(in *In) {
f.mu.Lock()
if in == nil {
f.stdin.SetReader(nil)
f.stdout.SetWriter(nil, func() io.WriteCloser { return nil })
f.stderr.SetWriter(nil, func() io.WriteCloser { return nil })
} else {
f.stdin.SetReader(in.Stdin)
f.stdout.SetWriter(in.Stdout, func() io.WriteCloser {
return in.Stdout // continue write; TODO: make it configurable if needed
})
f.stderr.SetWriter(in.Stderr, func() io.WriteCloser {
return in.Stderr // continue write; TODO: make it configurable if needed
})
}
f.mu.Unlock()
}
// SingleForwarder forwards IO from a reader to a writer.
// The reader and writer can be changed during forwarding
// using SetReader and SetWriter methods.
type SingleForwarder struct {
curR io.ReadCloser // closed when set another reader
curRMu sync.Mutex
curW io.WriteCloser // closed when set another writer
curWEOFHandler func() io.WriteCloser
curWMu sync.Mutex
updateRCh chan io.ReadCloser
doneCh chan struct{}
closeOnce sync.Once
}
func NewSingleForwarder() *SingleForwarder {
f := &SingleForwarder{
updateRCh: make(chan io.ReadCloser),
doneCh: make(chan struct{}),
}
go f.doForward()
return f
}
func (f *SingleForwarder) doForward() {
var r io.ReadCloser
for {
readerInvalid := false
var readerInvalidMu sync.Mutex
copyReaderToWriter := false
if r != nil {
copyReaderToWriter = true
}
if copyReaderToWriter {
srcR := r
go func() {
buf := make([]byte, 4096)
readerClosed := false
for {
n, readErr := srcR.Read(buf)
if readErr != nil {
srcR.Close()
readerClosed = true
if !errors.Is(readErr, io.EOF) && !errors.Is(readErr, io.ErrClosedPipe) {
logrus.Debugf("single forwarder: reader error: %v", readErr)
return
}
}
f.curWMu.Lock()
w := f.curW
f.curWMu.Unlock()
if w != nil {
if _, err := w.Write(buf[:n]); err != nil && !errors.Is(err, io.ErrClosedPipe) {
logrus.Debugf("single forwarder: writer error: %v", err)
}
}
readerInvalidMu.Lock()
ri := readerInvalid
readerInvalidMu.Unlock()
if ri || readerClosed {
return
}
if readErr != io.EOF {
logrus.Debugf("unknown error: %v\n", readErr)
continue
}
f.curWMu.Lock()
var newW io.WriteCloser
if f.curWEOFHandler != nil {
newW = f.curWEOFHandler()
}
f.curW = newW
f.curWMu.Unlock()
return
}
}()
}
select {
case newR := <-f.updateRCh:
f.curRMu.Lock()
if f.curR != nil {
f.curR.Close()
}
f.curR = newR
r = newR
readerInvalidMu.Lock()
readerInvalid = true
readerInvalidMu.Unlock()
f.curRMu.Unlock()
case <-f.doneCh:
return
}
}
}
// Close closes the both of registered reader and writer and finishes the forwarder.
func (f *SingleForwarder) Close() (retErr error) {
f.closeOnce.Do(func() {
f.curRMu.Lock()
r := f.curR
f.curR = nil
f.curRMu.Unlock()
if r != nil {
if err := r.Close(); err != nil {
retErr = err
}
}
// TODO: Wait until read data fully written to the current writer if needed.
f.curWMu.Lock()
w := f.curW
f.curW = nil
f.curWMu.Unlock()
if w != nil {
if err := w.Close(); err != nil {
retErr = err
}
}
close(f.doneCh)
})
return retErr
}
// SetWriter sets the specified writer as the forward destination.
// If curWEOFHandler isn't nil, this will be called when the current reader returns EOF.
func (f *SingleForwarder) SetWriter(w io.WriteCloser, curWEOFHandler func() io.WriteCloser) {
f.curWMu.Lock()
if f.curW != nil {
// close all stream on the current IO no to mix with the new IO
f.curW.Close()
}
f.curW = w
f.curWEOFHandler = curWEOFHandler
f.curWMu.Unlock()
}
// SetWriter sets the specified reader as the forward source.
func (f *SingleForwarder) SetReader(r io.ReadCloser) {
f.updateRCh <- r
}
|