File: controller.go

package info (click to toggle)
docker-buildx 0.19.3%2Bds1-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,852 kB
  • sloc: sh: 318; makefile: 73
file content (152 lines) | stat: -rw-r--r-- 4,463 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package local

import (
	"context"
	"io"
	"sync/atomic"

	"github.com/docker/buildx/build"
	cbuild "github.com/docker/buildx/controller/build"
	"github.com/docker/buildx/controller/control"
	controllererrors "github.com/docker/buildx/controller/errdefs"
	controllerapi "github.com/docker/buildx/controller/pb"
	"github.com/docker/buildx/controller/processes"
	"github.com/docker/buildx/util/desktop"
	"github.com/docker/buildx/util/ioset"
	"github.com/docker/buildx/util/progress"
	"github.com/docker/cli/cli/command"
	"github.com/moby/buildkit/client"
	"github.com/pkg/errors"
)

func NewLocalBuildxController(ctx context.Context, dockerCli command.Cli, logger progress.SubLogger) control.BuildxController {
	return &localController{
		dockerCli: dockerCli,
		sessionID: "local",
		processes: processes.NewManager(),
	}
}

type buildConfig struct {
	// TODO: these two structs should be merged
	// Discussion: https://github.com/docker/buildx/pull/1640#discussion_r1113279719
	resultCtx    *build.ResultHandle
	buildOptions *controllerapi.BuildOptions
}

type localController struct {
	dockerCli   command.Cli
	sessionID   string
	buildConfig buildConfig
	processes   *processes.Manager

	buildOnGoing atomic.Bool
}

func (b *localController) Build(ctx context.Context, options *controllerapi.BuildOptions, in io.ReadCloser, progress progress.Writer) (string, *client.SolveResponse, *build.Inputs, error) {
	if !b.buildOnGoing.CompareAndSwap(false, true) {
		return "", nil, nil, errors.New("build ongoing")
	}
	defer b.buildOnGoing.Store(false)

	resp, res, dockerfileMappings, buildErr := cbuild.RunBuild(ctx, b.dockerCli, options, in, progress, true)
	// NOTE: RunBuild can return *build.ResultHandle even on error.
	if res != nil {
		b.buildConfig = buildConfig{
			resultCtx:    res,
			buildOptions: options,
		}
		if buildErr != nil {
			var ref string
			var ebr *desktop.ErrorWithBuildRef
			if errors.As(buildErr, &ebr) {
				ref = ebr.Ref
			}
			buildErr = controllererrors.WrapBuild(buildErr, b.sessionID, ref)
		}
	}
	if buildErr != nil {
		return "", nil, nil, buildErr
	}
	return b.sessionID, resp, dockerfileMappings, nil
}

func (b *localController) ListProcesses(ctx context.Context, sessionID string) (infos []*controllerapi.ProcessInfo, retErr error) {
	if sessionID != b.sessionID {
		return nil, errors.Errorf("unknown session ID %q", sessionID)
	}
	return b.processes.ListProcesses(), nil
}

func (b *localController) DisconnectProcess(ctx context.Context, sessionID, pid string) error {
	if sessionID != b.sessionID {
		return errors.Errorf("unknown session ID %q", sessionID)
	}
	return b.processes.DeleteProcess(pid)
}

func (b *localController) cancelRunningProcesses() {
	b.processes.CancelRunningProcesses()
}

func (b *localController) Invoke(ctx context.Context, sessionID string, pid string, cfg *controllerapi.InvokeConfig, ioIn io.ReadCloser, ioOut io.WriteCloser, ioErr io.WriteCloser) error {
	if sessionID != b.sessionID {
		return errors.Errorf("unknown session ID %q", sessionID)
	}

	proc, ok := b.processes.Get(pid)
	if !ok {
		// Start a new process.
		if b.buildConfig.resultCtx == nil {
			return errors.New("no build result is registered")
		}
		var err error
		proc, err = b.processes.StartProcess(pid, b.buildConfig.resultCtx, cfg)
		if err != nil {
			return err
		}
	}

	// Attach containerIn to this process
	ioCancelledCh := make(chan struct{})
	proc.ForwardIO(&ioset.In{Stdin: ioIn, Stdout: ioOut, Stderr: ioErr}, func(error) { close(ioCancelledCh) })

	select {
	case <-ioCancelledCh:
		return errors.Errorf("io cancelled")
	case err := <-proc.Done():
		return err
	case <-ctx.Done():
		return context.Cause(ctx)
	}
}

func (b *localController) Kill(context.Context) error {
	b.Close()
	return nil
}

func (b *localController) Close() error {
	b.cancelRunningProcesses()
	if b.buildConfig.resultCtx != nil {
		b.buildConfig.resultCtx.Done()
	}
	// TODO: cancel ongoing builds?
	return nil
}

func (b *localController) List(ctx context.Context) (res []string, _ error) {
	return []string{b.sessionID}, nil
}

func (b *localController) Disconnect(ctx context.Context, key string) error {
	b.Close()
	return nil
}

func (b *localController) Inspect(ctx context.Context, sessionID string) (*controllerapi.InspectResponse, error) {
	if sessionID != b.sessionID {
		return nil, errors.Errorf("unknown session ID %q", sessionID)
	}
	return &controllerapi.InspectResponse{Options: b.buildConfig.buildOptions}, nil
}