1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
package processes
import (
"context"
"sync"
"sync/atomic"
"github.com/docker/buildx/build"
"github.com/docker/buildx/controller/pb"
"github.com/docker/buildx/util/ioset"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// Process provides methods to control a process.
type Process struct {
inEnd *ioset.Forwarder
invokeConfig *pb.InvokeConfig
errCh chan error
processCancel func()
serveIOCancel func()
}
// ForwardIO forwards process's io to the specified reader/writer.
// Optionally specify ioCancelCallback which will be called when
// the process closes the specified IO. This will be useful for additional cleanup.
func (p *Process) ForwardIO(in *ioset.In, ioCancelCallback func()) {
p.inEnd.SetIn(in)
if f := p.serveIOCancel; f != nil {
f()
}
p.serveIOCancel = ioCancelCallback
}
// Done returns a channel where error or nil will be sent
// when the process exits.
// TODO: change this to Wait()
func (p *Process) Done() <-chan error {
return p.errCh
}
// Manager manages a set of proceses.
type Manager struct {
container atomic.Value
processes sync.Map
}
// NewManager creates and returns a Manager.
func NewManager() *Manager {
return &Manager{}
}
// Get returns the specified process.
func (m *Manager) Get(id string) (*Process, bool) {
v, ok := m.processes.Load(id)
if !ok {
return nil, false
}
return v.(*Process), true
}
// CancelRunningProcesses cancels execution of all running processes.
func (m *Manager) CancelRunningProcesses() {
var funcs []func()
m.processes.Range(func(key, value any) bool {
funcs = append(funcs, value.(*Process).processCancel)
m.processes.Delete(key)
return true
})
for _, f := range funcs {
f()
}
}
// ListProcesses lists all running processes.
func (m *Manager) ListProcesses() (res []*pb.ProcessInfo) {
m.processes.Range(func(key, value any) bool {
res = append(res, &pb.ProcessInfo{
ProcessID: key.(string),
InvokeConfig: value.(*Process).invokeConfig,
})
return true
})
return res
}
// DeleteProcess deletes the specified process.
func (m *Manager) DeleteProcess(id string) error {
p, ok := m.processes.LoadAndDelete(id)
if !ok {
return errors.Errorf("unknown process %q", id)
}
p.(*Process).processCancel()
return nil
}
// StartProcess starts a process in the container.
// When a container isn't available (i.e. first time invoking or the container has exited) or cfg.Rollback is set,
// this method will start a new container and run the process in it. Otherwise, this method starts a new process in the
// existing container.
func (m *Manager) StartProcess(pid string, resultCtx *build.ResultHandle, cfg *pb.InvokeConfig) (*Process, error) {
// Get the target result to invoke a container from
var ctr *build.Container
if a := m.container.Load(); a != nil {
ctr = a.(*build.Container)
}
if cfg.Rollback || ctr == nil || ctr.IsUnavailable() {
go m.CancelRunningProcesses()
// (Re)create a new container if this is rollback or first time to invoke a process.
if ctr != nil {
go ctr.Cancel() // Finish the existing container
}
var err error
ctr, err = build.NewContainer(context.TODO(), resultCtx, cfg)
if err != nil {
return nil, errors.Errorf("failed to create container %v", err)
}
m.container.Store(ctr)
}
// [client(ForwardIO)] <-forwarder(switchable)-> [out] <-pipe-> [in] <- [process]
in, out := ioset.Pipe()
f := ioset.NewForwarder()
f.PropagateStdinClose = false
f.SetOut(&out)
// Register process
ctx, cancel := context.WithCancel(context.TODO())
var cancelOnce sync.Once
processCancelFunc := func() { cancelOnce.Do(func() { cancel(); f.Close(); in.Close(); out.Close() }) }
p := &Process{
inEnd: f,
invokeConfig: cfg,
processCancel: processCancelFunc,
errCh: make(chan error),
}
m.processes.Store(pid, p)
go func() {
var err error
if err = ctr.Exec(ctx, cfg, in.Stdin, in.Stdout, in.Stderr); err != nil {
logrus.Debugf("process error: %v", err)
}
logrus.Debugf("finished process %s %v", pid, cfg.Entrypoint)
m.processes.Delete(pid)
processCancelFunc()
p.errCh <- err
}()
return p, nil
}
|