1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
package build
import (
"context"
_ "crypto/sha256" // ensure digests can be computed
"io"
"sync"
"sync/atomic"
"syscall"
controllerapi "github.com/docker/buildx/controller/pb"
gateway "github.com/moby/buildkit/frontend/gateway/client"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type Container struct {
cancelOnce sync.Once
containerCancel func()
isUnavailable atomic.Bool
initStarted atomic.Bool
container gateway.Container
releaseCh chan struct{}
resultCtx *ResultHandle
}
func NewContainer(ctx context.Context, resultCtx *ResultHandle, cfg *controllerapi.InvokeConfig) (*Container, error) {
mainCtx := ctx
ctrCh := make(chan *Container)
errCh := make(chan error)
go func() {
err := resultCtx.build(func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
ctx, cancel := context.WithCancel(ctx)
go func() {
<-mainCtx.Done()
cancel()
}()
containerCfg, err := resultCtx.getContainerConfig(ctx, c, cfg)
if err != nil {
return nil, err
}
containerCtx, containerCancel := context.WithCancel(ctx)
defer containerCancel()
bkContainer, err := c.NewContainer(containerCtx, containerCfg)
if err != nil {
return nil, err
}
releaseCh := make(chan struct{})
container := &Container{
containerCancel: containerCancel,
container: bkContainer,
releaseCh: releaseCh,
resultCtx: resultCtx,
}
doneCh := make(chan struct{})
defer close(doneCh)
resultCtx.registerCleanup(func() {
container.Cancel()
<-doneCh
})
ctrCh <- container
<-container.releaseCh
return nil, bkContainer.Release(ctx)
})
if err != nil {
errCh <- err
}
}()
select {
case ctr := <-ctrCh:
return ctr, nil
case err := <-errCh:
return nil, err
case <-mainCtx.Done():
return nil, mainCtx.Err()
}
}
func (c *Container) Cancel() {
c.markUnavailable()
c.cancelOnce.Do(func() {
if c.containerCancel != nil {
c.containerCancel()
}
close(c.releaseCh)
})
}
func (c *Container) IsUnavailable() bool {
return c.isUnavailable.Load()
}
func (c *Container) markUnavailable() {
c.isUnavailable.Store(true)
}
func (c *Container) Exec(ctx context.Context, cfg *controllerapi.InvokeConfig, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error {
if isInit := c.initStarted.CompareAndSwap(false, true); isInit {
defer func() {
// container can't be used after init exits
c.markUnavailable()
}()
}
err := exec(ctx, c.resultCtx, cfg, c.container, stdin, stdout, stderr)
if err != nil {
// Container becomes unavailable if one of the processes fails in it.
c.markUnavailable()
}
return err
}
func exec(ctx context.Context, resultCtx *ResultHandle, cfg *controllerapi.InvokeConfig, ctr gateway.Container, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error {
processCfg, err := resultCtx.getProcessConfig(cfg, stdin, stdout, stderr)
if err != nil {
return err
}
proc, err := ctr.Start(ctx, processCfg)
if err != nil {
return errors.Errorf("failed to start container: %v", err)
}
doneCh := make(chan struct{})
defer close(doneCh)
go func() {
select {
case <-ctx.Done():
if err := proc.Signal(ctx, syscall.SIGKILL); err != nil {
logrus.Warnf("failed to kill process: %v", err)
}
case <-doneCh:
}
}()
return proc.Wait()
}
|