File: invoke.go

package info (click to toggle)
docker-buildx 0.13.1%2Bds1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,356 kB
  • sloc: sh: 299; makefile: 87
file content (138 lines) | stat: -rw-r--r-- 3,384 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package build

import (
	"context"
	_ "crypto/sha256" // ensure digests can be computed
	"io"
	"sync"
	"sync/atomic"
	"syscall"

	controllerapi "github.com/docker/buildx/controller/pb"
	gateway "github.com/moby/buildkit/frontend/gateway/client"
	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
)

type Container struct {
	cancelOnce      sync.Once
	containerCancel func()
	isUnavailable   atomic.Bool
	initStarted     atomic.Bool
	container       gateway.Container
	releaseCh       chan struct{}
	resultCtx       *ResultHandle
}

func NewContainer(ctx context.Context, resultCtx *ResultHandle, cfg *controllerapi.InvokeConfig) (*Container, error) {
	mainCtx := ctx

	ctrCh := make(chan *Container)
	errCh := make(chan error)
	go func() {
		err := resultCtx.build(func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
			ctx, cancel := context.WithCancel(ctx)
			go func() {
				<-mainCtx.Done()
				cancel()
			}()

			containerCfg, err := resultCtx.getContainerConfig(ctx, c, cfg)
			if err != nil {
				return nil, err
			}
			containerCtx, containerCancel := context.WithCancel(ctx)
			defer containerCancel()
			bkContainer, err := c.NewContainer(containerCtx, containerCfg)
			if err != nil {
				return nil, err
			}
			releaseCh := make(chan struct{})
			container := &Container{
				containerCancel: containerCancel,
				container:       bkContainer,
				releaseCh:       releaseCh,
				resultCtx:       resultCtx,
			}
			doneCh := make(chan struct{})
			defer close(doneCh)
			resultCtx.registerCleanup(func() {
				container.Cancel()
				<-doneCh
			})
			ctrCh <- container
			<-container.releaseCh

			return nil, bkContainer.Release(ctx)
		})
		if err != nil {
			errCh <- err
		}
	}()
	select {
	case ctr := <-ctrCh:
		return ctr, nil
	case err := <-errCh:
		return nil, err
	case <-mainCtx.Done():
		return nil, mainCtx.Err()
	}
}

func (c *Container) Cancel() {
	c.markUnavailable()
	c.cancelOnce.Do(func() {
		if c.containerCancel != nil {
			c.containerCancel()
		}
		close(c.releaseCh)
	})
}

func (c *Container) IsUnavailable() bool {
	return c.isUnavailable.Load()
}

func (c *Container) markUnavailable() {
	c.isUnavailable.Store(true)
}

func (c *Container) Exec(ctx context.Context, cfg *controllerapi.InvokeConfig, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error {
	if isInit := c.initStarted.CompareAndSwap(false, true); isInit {
		defer func() {
			// container can't be used after init exits
			c.markUnavailable()
		}()
	}
	err := exec(ctx, c.resultCtx, cfg, c.container, stdin, stdout, stderr)
	if err != nil {
		// Container becomes unavailable if one of the processes fails in it.
		c.markUnavailable()
	}
	return err
}

func exec(ctx context.Context, resultCtx *ResultHandle, cfg *controllerapi.InvokeConfig, ctr gateway.Container, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error {
	processCfg, err := resultCtx.getProcessConfig(cfg, stdin, stdout, stderr)
	if err != nil {
		return err
	}
	proc, err := ctr.Start(ctx, processCfg)
	if err != nil {
		return errors.Errorf("failed to start container: %v", err)
	}

	doneCh := make(chan struct{})
	defer close(doneCh)
	go func() {
		select {
		case <-ctx.Done():
			if err := proc.Signal(ctx, syscall.SIGKILL); err != nil {
				logrus.Warnf("failed to kill process: %v", err)
			}
		case <-doneCh:
		}
	}()

	return proc.Wait()
}