File: processes.go

package info (click to toggle)
docker-buildx 0.13.1%2Bds1-3
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 2,356 kB
  • sloc: sh: 299; makefile: 87
file content (149 lines) | stat: -rw-r--r-- 4,139 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package processes

import (
	"context"
	"sync"
	"sync/atomic"

	"github.com/docker/buildx/build"
	"github.com/docker/buildx/controller/pb"
	"github.com/docker/buildx/util/ioset"
	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
)

// Process provides methods to control a process.
type Process struct {
	inEnd         *ioset.Forwarder
	invokeConfig  *pb.InvokeConfig
	errCh         chan error
	processCancel func()
	serveIOCancel func()
}

// ForwardIO forwards process's io to the specified reader/writer.
// Optionally specify ioCancelCallback which will be called when
// the process closes the specified IO. This will be useful for additional cleanup.
func (p *Process) ForwardIO(in *ioset.In, ioCancelCallback func()) {
	p.inEnd.SetIn(in)
	if f := p.serveIOCancel; f != nil {
		f()
	}
	p.serveIOCancel = ioCancelCallback
}

// Done returns a channel where error or nil will be sent
// when the process exits.
// TODO: change this to Wait()
func (p *Process) Done() <-chan error {
	return p.errCh
}

// Manager manages a set of proceses.
type Manager struct {
	container atomic.Value
	processes sync.Map
}

// NewManager creates and returns a Manager.
func NewManager() *Manager {
	return &Manager{}
}

// Get returns the specified process.
func (m *Manager) Get(id string) (*Process, bool) {
	v, ok := m.processes.Load(id)
	if !ok {
		return nil, false
	}
	return v.(*Process), true
}

// CancelRunningProcesses cancels execution of all running processes.
func (m *Manager) CancelRunningProcesses() {
	var funcs []func()
	m.processes.Range(func(key, value any) bool {
		funcs = append(funcs, value.(*Process).processCancel)
		m.processes.Delete(key)
		return true
	})
	for _, f := range funcs {
		f()
	}
}

// ListProcesses lists all running processes.
func (m *Manager) ListProcesses() (res []*pb.ProcessInfo) {
	m.processes.Range(func(key, value any) bool {
		res = append(res, &pb.ProcessInfo{
			ProcessID:    key.(string),
			InvokeConfig: value.(*Process).invokeConfig,
		})
		return true
	})
	return res
}

// DeleteProcess deletes the specified process.
func (m *Manager) DeleteProcess(id string) error {
	p, ok := m.processes.LoadAndDelete(id)
	if !ok {
		return errors.Errorf("unknown process %q", id)
	}
	p.(*Process).processCancel()
	return nil
}

// StartProcess starts a process in the container.
// When a container isn't available (i.e. first time invoking or the container has exited) or cfg.Rollback is set,
// this method will start a new container and run the process in it. Otherwise, this method starts a new process in the
// existing container.
func (m *Manager) StartProcess(pid string, resultCtx *build.ResultHandle, cfg *pb.InvokeConfig) (*Process, error) {
	// Get the target result to invoke a container from
	var ctr *build.Container
	if a := m.container.Load(); a != nil {
		ctr = a.(*build.Container)
	}
	if cfg.Rollback || ctr == nil || ctr.IsUnavailable() {
		go m.CancelRunningProcesses()
		// (Re)create a new container if this is rollback or first time to invoke a process.
		if ctr != nil {
			go ctr.Cancel() // Finish the existing container
		}
		var err error
		ctr, err = build.NewContainer(context.TODO(), resultCtx, cfg)
		if err != nil {
			return nil, errors.Errorf("failed to create container %v", err)
		}
		m.container.Store(ctr)
	}
	// [client(ForwardIO)] <-forwarder(switchable)-> [out] <-pipe-> [in] <- [process]
	in, out := ioset.Pipe()
	f := ioset.NewForwarder()
	f.PropagateStdinClose = false
	f.SetOut(&out)

	// Register process
	ctx, cancel := context.WithCancel(context.TODO())
	var cancelOnce sync.Once
	processCancelFunc := func() { cancelOnce.Do(func() { cancel(); f.Close(); in.Close(); out.Close() }) }
	p := &Process{
		inEnd:         f,
		invokeConfig:  cfg,
		processCancel: processCancelFunc,
		errCh:         make(chan error),
	}
	m.processes.Store(pid, p)
	go func() {
		var err error
		if err = ctr.Exec(ctx, cfg, in.Stdin, in.Stdout, in.Stderr); err != nil {
			logrus.Debugf("process error: %v", err)
		}
		logrus.Debugf("finished process %s %v", pid, cfg.Entrypoint)
		m.processes.Delete(pid)
		processCancelFunc()
		p.errCh <- err
	}()

	return p, nil
}